import paddle
import paddle.nn as nn
import paddle.nn.functional as F
from paddle import ParamAttr, reshape, transpose, concat, split
from paddle.nn.initializer import KaimingNormal
def channel_shuffle(x, groups):
x_shape = paddle.shape(x)
batch_size, height, width = x_shape[0], x_shape[2], x_shape[3]
num_channels = x.shape[1]
channels_per_group = num_channels // groups
# reshape
x = reshape(
x=x, shape=[batch_size, groups, channels_per_group, height, width])
# transpose
x = transpose(x=x, perm=[0, 2, 1, 3, 4])
# flatten
x = reshape(x=x, shape=[batch_size, num_channels, height, width])
return x
class ConvBNLayer(nn.Layer):
def __init__(
self,
in_channels,
out_channels,
kernel_size,
stride,
padding,
groups=1,
act=None,
name=None, ):
super(ConvBNLayer, self).__init__()
self._conv = nn.Conv2D(
in_channels=in_channels,
out_channels=out_channels,
kernel_size=kernel_size,
stride=stride,
padding=padding,
groups=groups,
weight_attr=ParamAttr(
initializer=KaimingNormal(), name=name + "_weights"),
bias_attr=False)
self._batch_norm = nn.BatchNorm(
out_channels,
param_attr=ParamAttr(name=name + "_bn_scale"),
bias_attr=ParamAttr(name=name + "_bn_offset"),
act=act,
moving_mean_name=name + "_bn_mean",
moving_variance_name=name + "_bn_variance")
def forward(self, inputs):
y = self._conv(inputs)
y = self._batch_norm(y)
return y
class InvertedResidual(nn.Layer):
def __init__(self, in_channels, out_channels, stride, act="relu",
name=None):
super(InvertedResidual, self).__init__()
self._conv_pw = ConvBNLayer(
in_channels=in_channels // 2,
out_channels=out_channels // 2,
kernel_size=1,
stride=1,
padding=0,
groups=1,
act=act,
name='stage_' + name + '_conv1')
self._conv_dw = ConvBNLayer(
in_channels=out_channels // 2,
out_channels=out_channels // 2,
kernel_size=3,
stride=stride,
padding=1,
groups=out_channels // 2,
act=None,
name='stage_' + name + '_conv2')
self._conv_linear = ConvBNLayer(
in_channels=out_channels // 2,
out_channels=out_channels // 2,
kernel_size=1,
stride=1,
padding=0,
groups=1,
act=act,
name='stage_' + name + '_conv3')
def forward(self, inputs):
x1, x2 = split(
inputs,
num_or_sections=[inputs.shape[1] // 2, inputs.shape[1] // 2],
axis=1)
x2 = self._conv_pw(x2)
x2 = self._conv_dw(x2)
x2 = self._conv_linear(x2)
out = concat([x1, x2], axis=1)
return channel_shuffle(out, 2)
class InvertedResidualDS(nn.Layer):
def __init__(self, in_channels, out_channels, stride, act="relu",
name=None):
super(InvertedResidualDS, self).__init__()
# branch1
self._conv_dw_1 = ConvBNLayer(
in_channels=in_channels,
out_channels=in_channels,
kernel_size=3,
stride=stride,
padding=1,
groups=in_channels,
act=None,
name='stage_' + name + '_conv4')
self._conv_linear_1 = ConvBNLayer(
in_channels=in_channels,
out_channels=out_channels // 2,
kernel_size=1,
stride=1,
padding=0,
groups=1,
act=act,
name='stage_' + name + '_conv5')
# branch2
self._conv_pw_2 = ConvBNLayer(
in_channels=in_channels,
out_channels=out_channels // 2,
kernel_size=1,
stride=1,
padding=0,
groups=1,
act=act,
name='stage_' + name + '_conv1')
self._conv_dw_2 = ConvBNLayer(
in_channels=out_channels // 2,
out_channels=out_channels // 2,
kernel_size=3,
stride=stride,
padding=1,
groups=out_channels // 2,
act=None,
name='stage_' + name + '_conv2')
self._conv_linear_2 = ConvBNLayer(
in_channels=out_channels // 2,
out_channels=out_channels // 2,
kernel_size=1,
stride=1,
padding=0,
groups=1,
act=act,
name='stage_' + name + '_conv3')
def forward(self, inputs):
x1 = self._conv_dw_1(inputs)
x1 = self._conv_linear_1(x1)
x2 = self._conv_pw_2(inputs)
x2 = self._conv_dw_2(x2)
x2 = self._conv_linear_2(x2)
out = concat([x1, x2], axis=1)
return channel_shuffle(out, 2)
class ShuffleNet(nn.Layer):
def __init__(self, scale=1.0, act="relu", in_channels=3, pretrained=None):
super(ShuffleNet, self).__init__()
self.scale = scale
self.pretrained = pretrained
stage_repeats = [4, 8, 4]
if scale == 0.25:
stage_out_channels = [-1, 24, 24, 48, 96, 512]
elif scale == 0.33:
stage_out_channels = [-1, 24, 32, 64, 128, 512]
elif scale == 0.5:
stage_out_channels = [-1, 24, 48, 96, 192, 1024]
elif scale == 1.0:
stage_out_channels = [-1, 24, 116, 232, 464, 1024]
elif scale == 1.5:
stage_out_channels = [-1, 24, 176, 352, 704, 1024]
elif scale == 2.0:
stage_out_channels = [-1, 24, 224, 488, 976, 2048]
else:
raise NotImplementedError("This scale size:[" + str(scale) +
"] is not implemented!")
self.out_index = [3, 11, 15]
self.feat_channels = stage_out_channels[1:5]
# 1. conv1
self._conv1 = ConvBNLayer(
in_channels=in_channels,
out_channels=stage_out_channels[1],
kernel_size=3,
stride=2,
padding=1,
act=act,
name='stage1_conv')
self._max_pool = nn.MaxPool2D(kernel_size=3, stride=2, padding=1)
# 2. bottleneck sequences
self._block_list = []
for stage_id, num_repeat in enumerate(stage_repeats):
for i in range(num_repeat):
if i == 0:
block = self.add_sublayer(
name=str(stage_id + 2) + '_' + str(i + 1),
sublayer=InvertedResidualDS(
in_channels=stage_out_channels[stage_id + 1],
out_channels=stage_out_channels[stage_id + 2],
stride=2,
act=act,
name=str(stage_id + 2) + '_' + str(i + 1)))
else:
block = self.add_sublayer(
name=str(stage_id + 2) + '_' + str(i + 1),
sublayer=InvertedResidual(
in_channels=stage_out_channels[stage_id + 2],
out_channels=stage_out_channels[stage_id + 2],
stride=1,
act=act,
name=str(stage_id + 2) + '_' + str(i + 1)))
self._block_list.append(block)
def forward(self, inputs):
feat_list = []
y = self._conv1(inputs)
y = self._max_pool(y)
feat_list.append(y)
for idx, inv in enumerate(self._block_list):
y = inv(y)
if idx in self.out_index:
feat_list.append(y)
return feat_list
def ShuffleNetV2_x0_25(**kwargs):
model = ShuffleNet(scale=0.25, **kwargs)
return model
class ConvBNReLU(nn.Layer):
def __init__(self,
in_channels,
out_channels,
kernel_size,
padding='same',
**kwargs):
super().__init__()
self._conv = nn.Conv2D(
in_channels, out_channels, kernel_size, padding=padding, **kwargs)
self._batch_norm = nn.BatchNorm2D(out_channels, data_format='NCHW')
self._relu = nn.layer.activation.ReLU()
def forward(self, x):
x = self._conv(x)
x = self._batch_norm(x)
x = self._relu(x)
return x
class ConvBN(nn.Layer):
def __init__(self,
in_channels,
out_channels,
kernel_size,
padding='same',
**kwargs):
super().__init__()
self._conv = nn.Conv2D(
in_channels, out_channels, kernel_size, padding=padding, **kwargs)
self._batch_norm = nn.BatchNorm2D(out_channels, data_format='NCHW')
def forward(self, x):
x = self._conv(x)
x = self._batch_norm(x)
return x
class UAFM(nn.Layer):
"""
The base of Unified Attention Fusion Module.
Args:
x_ch (int): The channel of x tensor, which is the low level feature.
y_ch (int): The channel of y tensor, which is the high level feature.
out_ch (int): The channel of output tensor.
ksize (int, optional): The kernel size of the conv for x tensor. Default: 3.
resize_mode (str, optional): The resize model in unsampling y tensor. Default: bilinear.
"""
def __init__(self, x_ch, y_ch, out_ch, ksize=3, resize_mode='bilinear'):
super().__init__()
self.conv_x = ConvBNReLU(
x_ch, y_ch, kernel_size=ksize, padding=ksize // 2, bias_attr=False)
self.conv_out = ConvBNReLU(
y_ch, out_ch, kernel_size=3, padding=1, bias_attr=False)
self.resize_mode = resize_mode
def check(self, x, y):
x_h, x_w = x.shape[2:]
y_h, y_w = y.shape[2:]
def prepare(self, x, y):
x = self.prepare_x(x, y)
y = self.prepare_y(x, y)
return x, y
def prepare_x(self, x, y):
x = self.conv_x(x)
return x
def prepare_y(self, x, y):
y_up = F.interpolate(y, paddle.shape(x)[2:], mode=self.resize_mode)
return y_up
def fuse(self, x, y):
out = x + y
out = self.conv_out(out)
return out
def forward(self, x, y):
"""
Args:
x (Tensor): The low level feature.
y (Tensor): The high level feature.
"""
self.check(x, y)
x, y = self.prepare(x, y)
out = self.fuse(x, y)
return out
def avg_reduce_channel(x):
# Reduce channel by avg
# Return cat([avg_ch_0, avg_ch_1, ...])
if not isinstance(x, (list, tuple)):
return paddle.mean(x, axis=1, keepdim=True)
elif len(x) == 1:
return paddle.mean(x[0], axis=1, keepdim=True)
else:
res = []
for xi in x:
res.append(paddle.mean(xi, axis=1, keepdim=True))
return paddle.concat(res, axis=1)
class UAFM_SpAtten_S(UAFM):
"""
The UAFM with spatial attention, which uses mean values.
Args:
x_ch (int): The channel of x tensor, which is the low level feature.
y_ch (int): The channel of y tensor, which is the high level feature.
out_ch (int): The channel of output tensor.
ksize (int, optional): The kernel size of the conv for x tensor. Default: 3.
resize_mode (str, optional): The resize model in unsampling y tensor. Default: bilinear.
"""
def __init__(self, x_ch, y_ch, out_ch, ksize=3, resize_mode='bilinear'):
super().__init__(x_ch, y_ch, out_ch, ksize, resize_mode)
self.conv_xy_atten = nn.Sequential(
ConvBNReLU(
2, 2, kernel_size=3, padding=1, bias_attr=False),
ConvBN(
2, 1, kernel_size=3, padding=1, bias_attr=False))
def fuse(self, x, y):
"""
Args:
x (Tensor): The low level feature.
y (Tensor): The high level feature.
"""
atten = avg_reduce_channel([x, y])
atten = self.conv_xy_atten(atten)
atten = F.sigmoid(atten)
out = x * atten + y * (1 - atten)
out = self.conv_out(out)
return out
class PPLiteSegRknn(nn.Layer):
"""
The PP_LiteSeg implementation based on PaddlePaddle. 适配RKNN模型
The original article refers to "Juncai Peng, Yi Liu, Shiyu Tang, Yuying Hao, Lutao Chu,
Guowei Chen, Zewu Wu, Zeyu Chen, Zhiliang Yu, Yuning Du, Qingqing Dang,Baohua Lai,
Qiwen Liu, Xiaoguang Hu, Dianhai Yu, Yanjun Ma. PP-LiteSeg: A Superior Real-Time Semantic
Segmentation Model. https://arxiv.org/abs/2204.02681".
Args:
num_classes (int): The number of target classes.
backbone(nn.Layer): Backbone network, such as stdc1net and resnet18. The backbone must
has feat_channels, of which the length is 5.
backbone_indices (List(int), optional): The values indicate the indices of output of backbone.
Default: [2, 3, 4].
arm_type (str, optional): The type of attention refinement module. Default: ARM_Add_SpAttenAdd3.
cm_bin_sizes (List(int), optional): The bin size of context module. Default: [1,2,4].
cm_out_ch (int, optional): The output channel of the last context module. Default: 128.
arm_out_chs (List(int), optional): The out channels of each arm module. Default: [64, 96, 128].
seg_head_inter_chs (List(int), optional): The intermediate channels of segmentation head.
Default: [64, 64, 64].
resize_mode (str, optional): The resize mode for the upsampling operation in decoder.
Default: bilinear.
pretrained (str, optional): The path or url of pretrained model. Default: None.
"""
def __init__(self,
num_classes,
backbone,
backbone_indices=[1, 2, 3],
cm_bin_sizes=[1, 2, 3],
cm_out_ch=128,
arm_out_chs=[32, 64, 96],
seg_head_inter_chs=[64, 64, 64],
resize_mode='bilinear'):
super().__init__()
# backbone
assert hasattr(backbone, 'feat_channels'), \
"The backbone should has feat_channels."
assert len(backbone.feat_channels) >= len(backbone_indices), \
f"The length of input backbone_indices ({len(backbone_indices)}) should not be" \
f"greater than the length of feat_channels ({len(backbone.feat_channels)})."
assert len(backbone.feat_channels) > max(backbone_indices), \
f"The max value ({max(backbone_indices)}) of backbone_indices should be " \
f"less than the length of feat_channels ({len(backbone.feat_channels)})."
self.backbone = backbone
assert len(backbone_indices) > 1, "The lenght of backbone_indices " \
"should be greater than 1"
self.backbone_indices = backbone_indices # [..., x16_id, x32_id]
backbone_out_chs = [backbone.feat_channels[i] for i in backbone_indices]
# head
if len(arm_out_chs) == 1:
arm_out_chs = arm_out_chs * len(backbone_indices)
assert len(arm_out_chs) == len(backbone_indices), "The length of " \
"arm_out_chs and backbone_indices should be equal"
self.ppseg_head = PPLiteSegHead(backbone_out_chs, arm_out_chs,
cm_bin_sizes, cm_out_ch,
resize_mode)
if len(seg_head_inter_chs) == 1:
seg_head_inter_chs = seg_head_inter_chs * len(backbone_indices)
assert len(seg_head_inter_chs) == len(backbone_indices), "The length of " \
"seg_head_inter_chs and backbone_indices should be equal"
self.seg_heads = nn.LayerList() # [..., head_16, head32]
for in_ch, mid_ch in zip(arm_out_chs, seg_head_inter_chs):
self.seg_heads.append(SegHead(in_ch, mid_ch, num_classes))
def forward(self, x):
x_hw = paddle.shape(x)[2:]
feats_backbone = self.backbone(x) # [x2, x4, x8, x16, x32]
assert len(feats_backbone) >= len(self.backbone_indices), \
f"The nums of backbone feats ({len(feats_backbone)}) should be greater or " \
f"equal than the nums of backbone_indices ({len(self.backbone_indices)})"
feats_selected = [feats_backbone[i] for i in self.backbone_indices]
feats_head = self.ppseg_head(feats_selected) # [..., x8, x16, x32]
if self.training:
logit_list = []
for x, seg_head in zip(feats_head, self.seg_heads):
x = seg_head(x)
logit_list.append(x)
logit_list = [
F.interpolate(
x, x_hw, mode='bilinear', align_corners=False)
for x in logit_list
]
else:
x = self.seg_heads[0](feats_head[0])
x = F.interpolate(x, x_hw, mode='bilinear', align_corners=False)
logit_list = [x]
return logit_list
class PPLiteSegHead(nn.Layer):
"""
The head of PPLiteSegRknn.
Args:
backbone_out_chs (List(Tensor)): The channels of output tensors in the backbone.
arm_out_chs (List(int)): The out channels of each arm module.
cm_bin_sizes (List(int)): The bin size of context module.
cm_out_ch (int): The output channel of the last context module.
arm_type (str): The type of attention refinement module.
resize_mode (str): The resize mode for the upsampling operation in decoder.
"""
def __init__(self, backbone_out_chs, arm_out_chs, cm_bin_sizes, cm_out_ch, resize_mode):
super().__init__()
self.cm = PPContextModule(backbone_out_chs[-1], cm_out_ch, cm_out_ch,
cm_bin_sizes)
arm_class = UAFM_SpAtten_S
self.arm_list = nn.LayerList() # [..., arm8, arm16, arm32]
for i in range(len(backbone_out_chs)):
low_chs = backbone_out_chs[i]
high_ch = cm_out_ch if i == len(
backbone_out_chs) - 1 else arm_out_chs[i + 1]
out_ch = arm_out_chs[i]
arm = arm_class(
low_chs, high_ch, out_ch, ksize=3, resize_mode=resize_mode)
self.arm_list.append(arm)
def forward(self, in_feat_list):
"""
Args:
in_feat_list (List(Tensor)): Such as [x2, x4, x8, x16, x32].
x2, x4 and x8 are optional.
Returns:
out_feat_list (List(Tensor)): Such as [x2, x4, x8, x16, x32].
x2, x4 and x8 are optional.
The length of in_feat_list and out_feat_list are the same.
"""
high_feat = self.cm(in_feat_list[-1])
out_feat_list = []
for i in reversed(range(len(in_feat_list))):
low_feat = in_feat_list[i]
arm = self.arm_list[i]
high_feat = arm(low_feat, high_feat)
out_feat_list.insert(0, high_feat)
return out_feat_list
class PPContextModule(nn.Layer):
"""
Simple Context module.
Args:
in_channels (int): The number of input channels to pyramid pooling module.
inter_channels (int): The number of inter channels to pyramid pooling module.
out_channels (int): The number of output channels after pyramid pooling module.
bin_sizes (tuple, optional): The out size of pooled feature maps. Default: (1, 3).
align_corners (bool): An argument of F.interpolate. It should be set to False
when the output size of feature is even, e.g. 1024x512, otherwise it is True, e.g. 769x769.
"""
def __init__(self,
in_channels,
inter_channels,
out_channels,
bin_sizes,
align_corners=False):
super().__init__()
self.stages = nn.LayerList([
self._make_stage(in_channels, inter_channels, size)
for size in bin_sizes
])
self.conv_out = ConvBNReLU(
in_channels=inter_channels,
out_channels=out_channels,
kernel_size=3,
padding=1)
self.align_corners = align_corners
def _make_stage(self, in_channels, out_channels, size):
prior = nn.AdaptiveAvgPool2D(output_size=size)
conv = ConvBNReLU(
in_channels=in_channels, out_channels=out_channels, kernel_size=1)
return nn.Sequential(prior, conv)
def forward(self, input):
out = None
input_shape = paddle.shape(input)[2:]
for stage in self.stages:
x = stage(input)
x = F.interpolate(
x,
input_shape,
mode='bilinear',
align_corners=self.align_corners)
if out is None:
out = x
else:
out += x
out = self.conv_out(out)
return out
class SegHead(nn.Layer):
def __init__(self, in_chan, mid_chan, n_classes):
super().__init__()
self.conv = ConvBNReLU(
in_chan,
mid_chan,
kernel_size=3,
stride=1,
padding=1,
bias_attr=False)
self.conv_out = nn.Conv2D(
mid_chan, n_classes, kernel_size=1, bias_attr=False)
def forward(self, x):
x = self.conv(x)
x = self.conv_out(x)
return x
if __name__ == "__main__":
import time
import warnings
import numpy as np
from paddle.static import InputSpec
warnings.filterwarnings('ignore')
net = PPLiteSegRknn(num_classes=3, backbone=ShuffleNetV2_x0_25())
total = sum([np.prod(param.shape) for param in net.parameters()])
print(f"模型参数量:{total / 1e6} M")
encoder_params = sum([np.prod(p.shape) for p in net.backbone.parameters()])
print(f'encoder参数量:{encoder_params / 1e6} M')
net.eval()
inputTensor = paddle.rand((1, 3, 768, 1024))
outputTensor = None
test_num = 10
count = 0
cost_time = 0.0
for i in range(test_num):
count += 1
t1 = time.time()
outputTensor = net(inputTensor)
t2 = time.time()
if count > 1:
cost_time += (t2 - t1) * 1000
print(f'average cost time:{cost_time / (count - 1)}ms')
print(f'len(outputTensor)={len(outputTensor)}')
print(f'type(outputTensor)={type(outputTensor)}')
print(f'type(outputTensor[0])={type(outputTensor[0])}')
print(f'outputTensor[0].shape={outputTensor[0].shape}')
v2版本
from paddle import ParamAttr, reshape, transpose, concat, split
from paddle.nn.initializer import KaimingNormal
import paddle
import paddle.nn as nn
import paddle.nn.functional as F
class CostomAdaptiveAvgPool2D(nn.Layer):
def __init__(self, output_size, input_size):
super(CostomAdaptiveAvgPool2D, self).__init__()
self.output_size = output_size
self.input_size = input_size
def forward(self, x):
H_in, W_in = self.input_size
H_out, W_out = [self.output_size, self.output_size] \
if isinstance(self.output_size, int) \
else self.output_size
out_i = []
for i in range(H_out):
out_j = []
for j in range(W_out):
hs = int(np.floor(i * H_in / H_out))
he = int(np.ceil((i + 1) * H_in / H_out))
ws = int(np.floor(j * W_in / W_out))
we = int(np.ceil((j + 1) * W_in / W_out))
# print(hs, he, ws, we)
kernel_size = [he - hs, we - ws]
out = F.avg_pool2d(x[:, :, hs:he, ws:we], kernel_size)
out_j.append(out)
out_j = paddle.concat(out_j, -1)
out_i.append(out_j)
out_i = paddle.concat(out_i, -2)
return out_i
def channel_shuffle(x, groups):
x_shape = paddle.shape(x)
batch_size, height, width = x_shape[0], x_shape[2], x_shape[3]
num_channels = x.shape[1]
channels_per_group = num_channels // groups
# reshape
x = reshape(
x=x, shape=[batch_size, groups, channels_per_group, height, width])
# transpose
x = transpose(x=x, perm=[0, 2, 1, 3, 4])
# flatten
x = reshape(x=x, shape=[batch_size, num_channels, height, width])
return x
class ConvBNLayer(nn.Layer):
def __init__(
self,
in_channels,
out_channels,
kernel_size,
stride,
padding,
groups=1,
act=None,
name=None, ):
super(ConvBNLayer, self).__init__()
self._conv = nn.Conv2D(
in_channels=in_channels,
out_channels=out_channels,
kernel_size=kernel_size,
stride=stride,
padding=padding,
groups=groups,
weight_attr=ParamAttr(
initializer=KaimingNormal(), name=name + "_weights"),
bias_attr=False)
self._batch_norm = nn.BatchNorm(
out_channels,
param_attr=ParamAttr(name=name + "_bn_scale"),
bias_attr=ParamAttr(name=name + "_bn_offset"),
act=act,
moving_mean_name=name + "_bn_mean",
moving_variance_name=name + "_bn_variance")
def forward(self, inputs):
y = self._conv(inputs)
y = self._batch_norm(y)
return y
class InvertedResidual(nn.Layer):
def __init__(self, in_channels, out_channels, stride, act="relu",
name=None):
super(InvertedResidual, self).__init__()
self._conv_pw = ConvBNLayer(
in_channels=in_channels // 2,
out_channels=out_channels // 2,
kernel_size=1,
stride=1,
padding=0,
groups=1,
act=act,
name='stage_' + name + '_conv1')
self._conv_dw = ConvBNLayer(
in_channels=out_channels // 2,
out_channels=out_channels // 2,
kernel_size=3,
stride=stride,
padding=1,
groups=out_channels // 2,
act=None,
name='stage_' + name + '_conv2')
self._conv_linear = ConvBNLayer(
in_channels=out_channels // 2,
out_channels=out_channels // 2,
kernel_size=1,
stride=1,
padding=0,
groups=1,
act=act,
name='stage_' + name + '_conv3')
def forward(self, inputs):
x1, x2 = split(
inputs,
num_or_sections=[inputs.shape[1] // 2, inputs.shape[1] // 2],
axis=1)
x2 = self._conv_pw(x2)
x2 = self._conv_dw(x2)
x2 = self._conv_linear(x2)
out = concat([x1, x2], axis=1)
return channel_shuffle(out, 2)
class InvertedResidualDS(nn.Layer):
def __init__(self, in_channels, out_channels, stride, act="relu",
name=None):
super(InvertedResidualDS, self).__init__()
# branch1
self._conv_dw_1 = ConvBNLayer(
in_channels=in_channels,
out_channels=in_channels,
kernel_size=3,
stride=stride,
padding=1,
groups=in_channels,
act=None,
name='stage_' + name + '_conv4')
self._conv_linear_1 = ConvBNLayer(
in_channels=in_channels,
out_channels=out_channels // 2,
kernel_size=1,
stride=1,
padding=0,
groups=1,
act=act,
name='stage_' + name + '_conv5')
# branch2
self._conv_pw_2 = ConvBNLayer(
in_channels=in_channels,
out_channels=out_channels // 2,
kernel_size=1,
stride=1,
padding=0,
groups=1,
act=act,
name='stage_' + name + '_conv1')
self._conv_dw_2 = ConvBNLayer(
in_channels=out_channels // 2,
out_channels=out_channels // 2,
kernel_size=3,
stride=stride,
padding=1,
groups=out_channels // 2,
act=None,
name='stage_' + name + '_conv2')
self._conv_linear_2 = ConvBNLayer(
in_channels=out_channels // 2,
out_channels=out_channels // 2,
kernel_size=1,
stride=1,
padding=0,
groups=1,
act=act,
name='stage_' + name + '_conv3')
def forward(self, inputs):
x1 = self._conv_dw_1(inputs)
x1 = self._conv_linear_1(x1)
x2 = self._conv_pw_2(inputs)
x2 = self._conv_dw_2(x2)
x2 = self._conv_linear_2(x2)
out = concat([x1, x2], axis=1)
return channel_shuffle(out, 2)
class ShuffleNet(nn.Layer):
def __init__(self, scale=1.0, act="relu", in_channels=3, pretrained=None):
super(ShuffleNet, self).__init__()
self.scale = scale
self.pretrained = pretrained
stage_repeats = [4, 8, 4]
if scale == 0.25:
stage_out_channels = [-1, 24, 24, 48, 96, 512]
elif scale == 0.33:
stage_out_channels = [-1, 24, 32, 64, 128, 512]
elif scale == 0.5:
stage_out_channels = [-1, 24, 48, 96, 192, 1024]
elif scale == 1.0:
stage_out_channels = [-1, 24, 116, 232, 464, 1024]
elif scale == 1.5:
stage_out_channels = [-1, 24, 176, 352, 704, 1024]
elif scale == 2.0:
stage_out_channels = [-1, 24, 224, 488, 976, 2048]
else:
raise NotImplementedError("This scale size:[" + str(scale) +
"] is not implemented!")
self.out_index = [3, 11, 15]
self.feat_channels = stage_out_channels[1:5]
# 1. conv1
self._conv1 = ConvBNLayer(
in_channels=in_channels,
out_channels=stage_out_channels[1],
kernel_size=3,
stride=2,
padding=1,
act=act,
name='stage1_conv')
self._max_pool = nn.MaxPool2D(kernel_size=3, stride=2, padding=1)
# 2. bottleneck sequences
self._block_list = []
for stage_id, num_repeat in enumerate(stage_repeats):
for i in range(num_repeat):
if i == 0:
block = self.add_sublayer(
name=str(stage_id + 2) + '_' + str(i + 1),
sublayer=InvertedResidualDS(
in_channels=stage_out_channels[stage_id + 1],
out_channels=stage_out_channels[stage_id + 2],
stride=2,
act=act,
name=str(stage_id + 2) + '_' + str(i + 1)))
else:
block = self.add_sublayer(
name=str(stage_id + 2) + '_' + str(i + 1),
sublayer=InvertedResidual(
in_channels=stage_out_channels[stage_id + 2],
out_channels=stage_out_channels[stage_id + 2],
stride=1,
act=act,
name=str(stage_id + 2) + '_' + str(i + 1)))
self._block_list.append(block)
def forward(self, inputs):
feat_list = []
y = self._conv1(inputs)
y = self._max_pool(y)
feat_list.append(y)
for idx, inv in enumerate(self._block_list):
y = inv(y)
if idx in self.out_index:
feat_list.append(y)
return feat_list
def ShuffleNetV2_x0_25(**kwargs):
model = ShuffleNet(scale=0.25, **kwargs)
return model
class ConvBNReLU(nn.Layer):
def __init__(self,
in_channels,
out_channels,
kernel_size,
padding='same',
**kwargs):
super().__init__()
self._conv = nn.Conv2D(
in_channels, out_channels, kernel_size, padding=padding, **kwargs)
self._batch_norm = nn.BatchNorm2D(out_channels, data_format='NCHW')
self._relu = nn.layer.activation.ReLU()
def forward(self, x):
x = self._conv(x)
x = self._batch_norm(x)
x = self._relu(x)
return x
class ConvBN(nn.Layer):
def __init__(self,
in_channels,
out_channels,
kernel_size,
padding='same',
**kwargs):
super().__init__()
self._conv = nn.Conv2D(
in_channels, out_channels, kernel_size, padding=padding, **kwargs)
self._batch_norm = nn.BatchNorm2D(out_channels, data_format='NCHW')
def forward(self, x):
x = self._conv(x)
x = self._batch_norm(x)
return x
class UAFM(nn.Layer):
"""
The base of Unified Attention Fusion Module.
Args:
x_ch (int): The channel of x tensor, which is the low level feature.
y_ch (int): The channel of y tensor, which is the high level feature.
out_ch (int): The channel of output tensor.
ksize (int, optional): The kernel size of the conv for x tensor. Default: 3.
resize_mode (str, optional): The resize model in unsampling y tensor. Default: bilinear.
"""
def __init__(self, x_ch, y_ch, out_ch, ksize=3, resize_mode='bilinear'):
super().__init__()
self.conv_x = ConvBNReLU(
x_ch, y_ch, kernel_size=ksize, padding=ksize // 2, bias_attr=False)
self.conv_out = ConvBNReLU(
y_ch, out_ch, kernel_size=3, padding=1, bias_attr=False)
self.resize_mode = resize_mode
def check(self, x, y):
x_h, x_w = x.shape[2:]
y_h, y_w = y.shape[2:]
def prepare(self, x, y):
x = self.prepare_x(x, y)
y = self.prepare_y(x, y)
return x, y
def prepare_x(self, x, y):
x = self.conv_x(x)
return x
def prepare_y(self, x, y):
y_up = F.interpolate(y, paddle.shape(x)[2:], mode=self.resize_mode)
return y_up
def fuse(self, x, y):
out = x + y
out = self.conv_out(out)
return out
def forward(self, x, y):
"""
Args:
x (Tensor): The low level feature.
y (Tensor): The high level feature.
"""
self.check(x, y)
x, y = self.prepare(x, y)
out = self.fuse(x, y)
return out
def avg_reduce_channel(x):
# Reduce channel by avg
# Return cat([avg_ch_0, avg_ch_1, ...])
if not isinstance(x, (list, tuple)):
return paddle.mean(x, axis=1, keepdim=True)
elif len(x) == 1:
return paddle.mean(x[0], axis=1, keepdim=True)
else:
res = []
for xi in x:
res.append(paddle.mean(xi, axis=1, keepdim=True))
return paddle.concat(res, axis=1)
class UAFM_SpAtten_S(UAFM):
"""
The UAFM with spatial attention, which uses mean values.
Args:
x_ch (int): The channel of x tensor, which is the low level feature.
y_ch (int): The channel of y tensor, which is the high level feature.
out_ch (int): The channel of output tensor.
ksize (int, optional): The kernel size of the conv for x tensor. Default: 3.
resize_mode (str, optional): The resize model in unsampling y tensor. Default: bilinear.
"""
def __init__(self, x_ch, y_ch, out_ch, ksize=3, resize_mode='bilinear'):
super().__init__(x_ch, y_ch, out_ch, ksize, resize_mode)
self.conv_xy_atten = nn.Sequential(
ConvBNReLU(
2, 2, kernel_size=3, padding=1, bias_attr=False),
ConvBN(
2, 1, kernel_size=3, padding=1, bias_attr=False))
def fuse(self, x, y):
"""
Args:
x (Tensor): The low level feature.
y (Tensor): The high level feature.
"""
atten = avg_reduce_channel([x, y])
atten = self.conv_xy_atten(atten)
atten = F.sigmoid(atten)
out = x * atten + y * (1 - atten)
out = self.conv_out(out)
return out
class PPLiteSegRknn(nn.Layer):
"""
The PP_LiteSeg implementation based on PaddlePaddle. 适配RKNN模型
The original article refers to "Juncai Peng, Yi Liu, Shiyu Tang, Yuying Hao, Lutao Chu,
Guowei Chen, Zewu Wu, Zeyu Chen, Zhiliang Yu, Yuning Du, Qingqing Dang,Baohua Lai,
Qiwen Liu, Xiaoguang Hu, Dianhai Yu, Yanjun Ma. PP-LiteSeg: A Superior Real-Time Semantic
Segmentation Model. https://arxiv.org/abs/2204.02681".
Args:
num_classes (int): The number of target classes.
backbone(nn.Layer): Backbone network, such as stdc1net and resnet18. The backbone must
has feat_channels, of which the length is 5.
backbone_indices (List(int), optional): The values indicate the indices of output of backbone.
Default: [2, 3, 4].
arm_type (str, optional): The type of attention refinement module. Default: ARM_Add_SpAttenAdd3.
cm_bin_sizes (List(int), optional): The bin size of context module. Default: [1,2,4].
cm_out_ch (int, optional): The output channel of the last context module. Default: 128.
arm_out_chs (List(int), optional): The out channels of each arm module. Default: [64, 96, 128].
seg_head_inter_chs (List(int), optional): The intermediate channels of segmentation head.
Default: [64, 64, 64].
resize_mode (str, optional): The resize mode for the upsampling operation in decoder.
Default: bilinear.
pretrained (str, optional): The path or url of pretrained model. Default: None.
"""
def __init__(self,
num_classes,
backbone,
backbone_indices=[1, 2, 3],
cm_bin_sizes=[1, 2, 3],
cm_out_ch=128,
arm_out_chs=[32, 64, 96],
seg_head_inter_chs=[64, 64, 64],
resize_mode='bilinear'):
super().__init__()
# backbone
assert hasattr(backbone, 'feat_channels'), \
"The backbone should has feat_channels."
assert len(backbone.feat_channels) >= len(backbone_indices), \
f"The length of input backbone_indices ({len(backbone_indices)}) should not be" \
f"greater than the length of feat_channels ({len(backbone.feat_channels)})."
assert len(backbone.feat_channels) > max(backbone_indices), \
f"The max value ({max(backbone_indices)}) of backbone_indices should be " \
f"less than the length of feat_channels ({len(backbone.feat_channels)})."
self.backbone = backbone
assert len(backbone_indices) > 1, "The lenght of backbone_indices " \
"should be greater than 1"
self.backbone_indices = backbone_indices # [..., x16_id, x32_id]
backbone_out_chs = [backbone.feat_channels[i] for i in backbone_indices]
# head
if len(arm_out_chs) == 1:
arm_out_chs = arm_out_chs * len(backbone_indices)
assert len(arm_out_chs) == len(backbone_indices), "The length of " \
"arm_out_chs and backbone_indices should be equal"
self.ppseg_head = PPLiteSegHead(backbone_out_chs, arm_out_chs,
cm_bin_sizes, cm_out_ch,
resize_mode)
if len(seg_head_inter_chs) == 1:
seg_head_inter_chs = seg_head_inter_chs * len(backbone_indices)
assert len(seg_head_inter_chs) == len(backbone_indices), "The length of " \
"seg_head_inter_chs and backbone_indices should be equal"
self.seg_heads = nn.LayerList() # [..., head_16, head32]
for in_ch, mid_ch in zip(arm_out_chs, seg_head_inter_chs):
self.seg_heads.append(SegHead(in_ch, mid_ch, num_classes))
def forward(self, x):
x_hw = paddle.shape(x)[2:]
feats_backbone = self.backbone(x) # [x2, x4, x8, x16, x32]
assert len(feats_backbone) >= len(self.backbone_indices), \
f"The nums of backbone feats ({len(feats_backbone)}) should be greater or " \
f"equal than the nums of backbone_indices ({len(self.backbone_indices)})"
feats_selected = [feats_backbone[i] for i in self.backbone_indices]
feats_head = self.ppseg_head(feats_selected) # [..., x8, x16, x32]
if self.training:
logit_list = []
for x, seg_head in zip(feats_head, self.seg_heads):
x = seg_head(x)
logit_list.append(x)
logit_list = [
F.interpolate(
x, x_hw, mode='bilinear', align_corners=False)
for x in logit_list
]
else:
x = self.seg_heads[0](feats_head[0])
x = F.interpolate(x, x_hw, mode='bilinear', align_corners=False)
logit_list = [x]
return logit_list
class PPLiteSegHead(nn.Layer):
"""
The head of PPLiteSegRknn.
Args:
backbone_out_chs (List(Tensor)): The channels of output tensors in the backbone.
arm_out_chs (List(int)): The out channels of each arm module.
cm_bin_sizes (List(int)): The bin size of context module.
cm_out_ch (int): The output channel of the last context module.
arm_type (str): The type of attention refinement module.
resize_mode (str): The resize mode for the upsampling operation in decoder.
"""
def __init__(self, backbone_out_chs, arm_out_chs, cm_bin_sizes, cm_out_ch, resize_mode):
super().__init__()
self.cm = PPContextModule(backbone_out_chs[-1], cm_out_ch, cm_out_ch,
cm_bin_sizes)
arm_class = UAFM_SpAtten_S
self.arm_list = nn.LayerList() # [..., arm8, arm16, arm32]
for i in range(len(backbone_out_chs)):
low_chs = backbone_out_chs[i]
high_ch = cm_out_ch if i == len(
backbone_out_chs) - 1 else arm_out_chs[i + 1]
out_ch = arm_out_chs[i]
arm = arm_class(
low_chs, high_ch, out_ch, ksize=3, resize_mode=resize_mode)
self.arm_list.append(arm)
def forward(self, in_feat_list):
"""
Args:
in_feat_list (List(Tensor)): Such as [x2, x4, x8, x16, x32].
x2, x4 and x8 are optional.
Returns:
out_feat_list (List(Tensor)): Such as [x2, x4, x8, x16, x32].
x2, x4 and x8 are optional.
The length of in_feat_list and out_feat_list are the same.
"""
high_feat = self.cm(in_feat_list[-1])
out_feat_list = []
for i in reversed(range(len(in_feat_list))):
low_feat = in_feat_list[i]
arm = self.arm_list[i]
high_feat = arm(low_feat, high_feat)
out_feat_list.insert(0, high_feat)
return out_feat_list
class PPContextModule(nn.Layer):
"""
Simple Context module.
Args:
in_channels (int): The number of input channels to pyramid pooling module.
inter_channels (int): The number of inter channels to pyramid pooling module.
out_channels (int): The number of output channels after pyramid pooling module.
bin_sizes (tuple, optional): The out size of pooled feature maps. Default: (1, 3).
align_corners (bool): An argument of F.interpolate. It should be set to False
when the output size of feature is even, e.g. 1024x512, otherwise it is True, e.g. 769x769.
"""
def __init__(self,
in_channels,
inter_channels,
out_channels,
bin_sizes,
align_corners=False):
super().__init__()
self.stages = nn.LayerList([
self._make_stage(in_channels, inter_channels, size)
for size in bin_sizes
])
self.conv_out = ConvBNReLU(
in_channels=inter_channels,
out_channels=out_channels,
kernel_size=3,
padding=1)
self.align_corners = align_corners
def _make_stage(self, in_channels, out_channels, size):
# prior = nn.AdaptiveAvgPool2D(output_size=size)
# prior = CostomAdaptiveAvgPool2D(output_size=size, input_size=(24, 32))
prior = nn.AvgPool2D(kernel_size=size)
conv = ConvBNReLU(
in_channels=in_channels, out_channels=out_channels, kernel_size=1)
return nn.Sequential(prior, conv)
def forward(self, input):
out = None
input_shape = paddle.shape(input)[2:]
for stage in self.stages:
x = stage(input)
x = F.interpolate(
x,
input_shape,
mode='bilinear',
align_corners=self.align_corners)
if out is None:
out = x
else:
out += x
out = self.conv_out(out)
return out
class SegHead(nn.Layer):
def __init__(self, in_chan, mid_chan, n_classes):
super().__init__()
self.conv = ConvBNReLU(
in_chan,
mid_chan,
kernel_size=3,
stride=1,
padding=1,
bias_attr=False)
self.conv_out = nn.Conv2D(
mid_chan, n_classes, kernel_size=1, bias_attr=False)
def forward(self, x):
x = self.conv(x)
x = self.conv_out(x)
return x
if __name__ == "__main__":
"""
paddle版本确保一致
pip install --upgrade "paddlepaddle==2.6.*" "paddle2onnx==1.0.*"
"""
import time
import warnings
import numpy as np
from paddle.static import InputSpec
warnings.filterwarnings('ignore')
net = PPLiteSegRknn(num_classes=3, backbone=ShuffleNetV2_x0_25())
total = sum([np.prod(param.shape) for param in net.parameters()])
print(f"模型参数量:{total / 1e6} M")
encoder_params = sum([np.prod(p.shape) for p in net.backbone.parameters()])
print(f'encoder参数量:{encoder_params / 1e6} M')
net.eval()
from paddle.static import InputSpec
spec = [InputSpec(shape=[None, 3, 768, 1024], dtype='float32', name='image')]
paddle.onnx.export(
net,
"ppliteseg_rknn_v0911",
input_spec=spec,
opset_version=13, # 建议>=12,兼容插值/激活等算子
enable_onnx_checker=True
)
# test_num = 10
# count = 0
# cost_time = 0.0
# for i in range(test_num):
# count += 1
# t1 = time.time()
# outputTensor = net(inputTensor)
# t2 = time.time()
# if count > 1:
# cost_time += (t2 - t1) * 1000
#
# print(f'average cost time:{cost_time / (count - 1)}ms')
# print(f'len(outputTensor)={len(outputTensor)}')
# print(f'type(outputTensor)={type(outputTensor)}')
# print(f'type(outputTensor[0])={type(outputTensor[0])}')
# print(f'outputTensor[0].shape={outputTensor[0].shape}')