『PyTorch × TensorFlow』第十七弹_ResNet快速实现

『TensorFlow』读书笔记_ResNet_V2

对比之前的复杂版本,这次的torch实现其实简单了不少,不过这和上面的代码实现逻辑过于复杂也有关系。

# Author : hellcat
# Time   : 18-3-2

"""
import os
os.environ["CUDA_VISIBLE_DEVICES"]="-1"
 
import numpy as np
np.set_printoptions(threshold=np.inf)
 
import tensorflow as tf
config = tf.ConfigProto()
config.gpu_options.allow_growth = True
sess = tf.Session(config=config)
"""

import torch as t
import torch.nn as nn
from torch.nn import functional as F


class ResidualBlock(nn.Module):
    def __init__(self, inchannel, outchannel, stride=1, shortcut=None):
        super(ResidualBlock, self).__init__()
        self.left = nn.Sequential(
            nn.Conv2d(inchannel, outchannel, kernel_size=3, stride=stride, padding=1),
            nn.BatchNorm2d(outchannel),
            nn.ReLU(inplace=True),
            nn.Conv2d(outchannel, outchannel, 3, 1, 1, bias=False),
            nn.BatchNorm2d(outchannel)
        )
        self.right = shortcut

    def forward(self, x):
        out = self.left(x)
        residual = x if self.right is None else self.right(x)
        out += residual
        return F.relu(out)


class ResNet(nn.Module):
    def __init__(self, num_classes=1000):
        super(ResNet, self).__init__()
        self.pre = nn.Sequential(
            nn.Conv2d(3, 64, 7, 2, 3, bias=False),
            nn.BatchNorm2d(64),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(kernel_size=3, stride=2, padding=1)
        )
        self.layer1 = self._make_layer(inchannel=64, outchannel=128, block_num=3)
        self.layer2 = self._make_layer(inchannel=128, outchannel=256, block_num=4, stride=2)
        self.layer3 = self._make_layer(inchannel=256, outchannel=512, block_num=6, stride=2)
        self.layer4 = self._make_layer(inchannel=512, outchannel=512, block_num=3, stride=2)

        self.fc = nn.Linear(512, num_classes)

    def _make_layer(self, inchannel, outchannel, block_num, stride=1):
        shortcut = nn.Sequential(
            nn.Conv2d(inchannel, outchannel, 1, stride, bias=False),
            nn.BatchNorm2d(outchannel)
        )
        layers = []
        layers.append(ResidualBlock(inchannel, outchannel, stride, shortcut))
        for i in range(1, block_num):
            layers.append(ResidualBlock(outchannel, outchannel))
        return nn.Sequential(*layers)

    def forward(self, x):
        x = self.pre(x)  # [1, 64, 56, 56]
        x = self.layer1(x)  # [1, 128, 56, 56]
        x = self.layer2(x)  # [1, 256, 28, 28]
        x = self.layer3(x)  # [1, 512, 14, 14]
        x = self.layer4(x)  # [1, 512, 7, 7]



        x = F.avg_pool2d(x, 7)
        x = x.view(x.size(0), -1)
        return self.fc(x)

def hook(module, inputdata, output):
    '''把这层的输出拷贝到features中'''
    print("钩子输出:", output.data.size())


module = ResNet()
img = t.autograd.Variable(t.randn(1, 3, 224, 224))
handle = module.pre[0].register_forward_hook(hook)
out = module(img)
handle.remove()
print(out)

上面代码中,我们注册了钩子尝试分析一下中间的输出,可以看到,torch中的卷积层默认是SAME模式,输出就是in/stride,和TensorFlow一致,

torch.Size([1, 64, 112, 112])

Variable containing:

0.6336 -0.5863 0.6472 ... -0.4694 0.1808 0.2837

[torch.FloatTensor of size 1x1000]

二、TensorFlow实现

同样的逻辑下,ResNet34的TensorFlow实现如下,使用的封装包ops,之前有介绍过,这里面小修了卷积层的封装,使得conv2d可以舍弃bias(就是卷及计算后不加偏执),

# Author : hellcat
# Time   : 18-3-7

"""
import os
os.environ["CUDA_VISIBLE_DEVICES"]="-1"
 
import numpy as np
np.set_printoptions(threshold=np.inf)
"""
import ops
import tensorflow as tf
config = tf.ConfigProto()
config.gpu_options.allow_growth = True
sess = tf.Session(config=config)


def ResidualBlock(x,
                  outchannel, stride=1, shortcut=None,
                  train=True, name="ResidualBlock"):
    with tf.variable_scope(name):
        conv1 = ops.conv2d(x, outchannel,
                           k_h=3, k_w=3,
                           s_h=stride, s_w=stride, scope="conv1")
        bn1 = tf.nn.relu(ops.batch_normal(conv1, train=train, scope="bn1"))
        conv2 = ops.conv2d(bn1, outchannel,
                           k_h=3, k_w=3,
                           s_h=1, s_w=1,
                           with_bias=False, scope="conv2")
        left = ops.batch_normal(conv2, train=train, scope="bn2")
        right = x if shortcut is None else shortcut(x)
        return tf.nn.relu(left + right)


class ResNet():
    def __init__(self):
        with tf.variable_scope("input"):
            x = tf.placeholder(dtype=tf.float32, shape=[1, 224, 224, 3])
        with tf.variable_scope("pre"):
            conv = ops.conv2d(x, output_dim=64,
                              k_h=7, k_w=7,
                              s_h=2, s_w=2,
                              with_bias=False)
            bn = tf.nn.relu(ops.batch_normal(conv))
            pool = tf.nn.max_pool(bn, ksize=[1, 3, 3, 1],
                                  strides=[1, 2, 2, 1], padding='SAME')

        with tf.variable_scope("layer1"):
            layer1 = self._make_layer(pool, outchannel=128, block_num=3)
        with tf.variable_scope("layer2"):
            layer2 = self._make_layer(layer1, outchannel=256, block_num=4, stride=2)
        with tf.variable_scope("layer3"):
            layer3 = self._make_layer(layer2, outchannel=512, block_num=6, stride=2)
        with tf.variable_scope("layer4"):
            layer4 = self._make_layer(layer3, outchannel=512, block_num=3, stride=2)
        # Tensor("layer1/ResidualBlock2/Relu_1:0", shape=(1, 56, 56, 128), dtype=float32)
        # Tensor("layer2/ResidualBlock3/Relu_1:0", shape=(1, 28, 28, 256), dtype=float32)
        # Tensor("layer3/ResidualBlock5/Relu_1:0", shape=(1, 14, 14, 512), dtype=float32)
        # Tensor("layer4/ResidualBlock2/Relu_1:0", shape=(1, 7, 7, 512), dtype=float32)
        pool = tf.nn.avg_pool(layer4, ksize=[1, 7, 7, 1],
                              strides=[1, 7, 7, 1], padding='SAME')
        reshape = tf.reshape(pool, [layer4.get_shape()[0], -1])
        self.fc = ops.linear(reshape, 1000)

    def __call__(self, *args, **kwargs):
        return self.fc

    def _make_layer(self,x,
                    outchannel,
                    block_num, stride=1):

        def shortcut(input_):
            with tf.variable_scope("shortcut"):
                conv = ops.conv2d(input_, output_dim=outchannel,
                                  k_w=1, k_h=1, s_w=stride, s_h=stride,
                                  with_bias=False)
                return ops.batch_normal(conv)

        x = ResidualBlock(x, outchannel, stride,
                          shortcut, name="ResidualBlock0")
        for i in range(1, block_num):
            x = ResidualBlock(x, outchannel,
                              name="ResidualBlock{}".format(i))
        return x

if __name__ == "__main__":
    resnet = ResNet()
    print(resnet())

ops.py卷积修改封装如下,

def conv2d(input_, output_dim,
           k_h=5, k_w=5, s_h=2, s_w=2, stddev=0.02,
           scope="conv2d", with_w=False, with_bias=True):
    """
    卷积网络封装
    :param input_: 
    :param output_dim: 输出的feature数目
    :param k_h: 
    :param k_w: 
    :param s_h: 
    :param s_w: 
    :param stddev: 
    :param scope: 
    :param with_w: 
    :param with_bias: 是否含有bias层
    :return: 
    """

    with tf.variable_scope(scope):
        w = tf.get_variable('w', [k_h, k_w, input_.get_shape()[-1], output_dim],
                            initializer=tf.truncated_normal_initializer(stddev=stddev))
        conv = tf.nn.conv2d(input_, w, strides=[1, s_h, s_w, 1], padding='SAME')
        if with_bias:
            biases = tf.get_variable('biases', [output_dim], initializer=tf.constant_initializer(0.0))
            conv = tf.reshape(tf.nn.bias_add(conv, biases), conv.get_shape())
        else:
            biases = None

    if with_w:
        return conv, w, biases
    else:
        return conv

输出如下,

Tensor("Linear/add:0", shape=(1, 1000), dtype=float32)

附:

ops.py截止本文发布的最新版本状态,

# Author : hellcat
# Time   : 18-1-21
# Usage  : 网络层函数封装
"""
conv2d
deconv2d
lrelu
linear
"""

import tensorflow as tf


# def batch_normal(x, train=True, epsilon=1e-5, decay=0.9, scope="batch_norm"):
#     return tf.contrib.layers.batch_norm(x,
#                                         decay=decay,
#                                         updates_collections=None,
#                                         epsilon=epsilon,
#                                         scale=True,
#                                         is_training=train,
#                                         scope=scope)

def batch_normal(x, epsilon=1e-5, momentum=0.9, train=True, scope='batch_norm'):
    with tf.variable_scope(scope):
        return tf.contrib.layers.batch_norm(x,
                                            decay=momentum,
                                            updates_collections=None,
                                            epsilon=epsilon,
                                            scale=True,
                                            is_training=train)
'''
Note: when training, the moving_mean and moving_variance need to be updated.
By default the update ops are placed in `tf.GraphKeys.UPDATE_OPS`, so they
need to be added as a dependency to the `train_op`. For example:

```python
    update_ops = tf.get_collection(tf.GraphKeys.UPDATE_OPS)
    with tf.control_dependencies(update_ops):
        train_op = optimizer.minimize(loss)
```

One can set updates_collections=None to force the updates in place, but that
can have a speed penalty, especially in distributed settings.
'''


# class batch_norm(object):
#     def __init__(self, epsilon=1e-5, decay=0.9, scope="batch_norm"):
#         with tf.variable_scope(scope):
#             self.epsilon = epsilon
#             self.decay = decay
#             # self.scope = scope
#
#     def __call__(self, x, scope, train=True):
#         return tf.contrib.layers.batch_norm(x,
#                                             decay=self.decay,
#                                             updates_collections=None,
#                                             epsilon=self.epsilon,
#                                             scale=True,
#                                             is_training=train,
#                                             scope=scope)


def concat(tensor_a, tensor_b):
    """
    组合Tensor,注意的是这里tensor_a的宽高应该大于等于tensor_b
    :param tensor_a: 前面的tensor
    :param tensor_b: 后面的tensor
    :return:
    """
    if tensor_a.get_shape().as_list()[1] > tensor_b.get_shape().as_list()[1]:
        return tf.concat([tf.slice(tensor_a,
                                   begin=[0, (int(tensor_a.shape[1]) - int(tensor_b.shape[1])) // 2,
                                          (int(tensor_a.shape[1]) - int(tensor_b.shape[1])) // 2, 0],
                                   size=[int(tensor_b.shape[0]), int(tensor_b.shape[1]),
                                         int(tensor_b.shape[2]), int(tensor_a.shape[3])],
                                   name='slice'),
                          tensor_b],
                         axis=3, name='concat')

    elif tensor_a.get_shape().as_list()[1] < tensor_b.get_shape().as_list()[1]:
        return tf.concat([tensor_a,
                          tf.slice(tensor_b,
                                   begin=[0, (int(tensor_b.shape[1]) - int(tensor_a.shape[1])) // 2,
                                          (int(tensor_b.shape[1]) - int(tensor_a.shape[1])) // 2, 0],
                                   size=[int(tensor_a.shape[0]), int(tensor_a.shape[1]),
                                         int(tensor_a.shape[2]), int(tensor_b.shape[3])],
                                   name='slice')],
                         axis=3, name='concat')
    else:
        return tf.concat([tensor_a, tensor_b], axis=3)


def conv_cond_concat(x, y):
    """
    广播并连接向量,用于ac_gan的标签对矩阵拼接
    :param x: features,例如shape:[n,16,16,128]
    :param y: 扩暂维度后的标签,例如shape:[n,1,1,10]
    :return: 拼接后features,例如:[n,16,16,138]
    """
    x_shapes = x.get_shape()
    y_shapes = y.get_shape()
    return tf.concat([x, y * tf.ones([x_shapes[0], x_shapes[1], x_shapes[2], y_shapes[3]])], axis=3)


def conv2d(input_, output_dim,
           k_h=5, k_w=5, s_h=2, s_w=2, stddev=0.02,
           scope="conv2d", with_w=False, with_bias=True):
    """
    卷积网络封装
    :param input_: 
    :param output_dim: 输出的feature数目
    :param k_h: 
    :param k_w: 
    :param s_h: 
    :param s_w: 
    :param stddev: 
    :param scope: 
    :param with_w: 
    :param with_bias: 是否含有bias层
    :return: 
    """

    with tf.variable_scope(scope):
        w = tf.get_variable('w', [k_h, k_w, input_.get_shape()[-1], output_dim],
                            initializer=tf.truncated_normal_initializer(stddev=stddev))
        conv = tf.nn.conv2d(input_, w, strides=[1, s_h, s_w, 1], padding='SAME')
        if with_bias:
            biases = tf.get_variable('biases', [output_dim], initializer=tf.constant_initializer(0.0))
            conv = tf.reshape(tf.nn.bias_add(conv, biases), conv.get_shape())
        else:
            biases = None

    if with_w:
        return conv, w, biases
    else:
        return conv


def deconv2d(input_, output_shape,
             k_h=5, k_w=5, s_h=2, s_w=2, stddev=0.02,
             scope="deconv2d", with_w=False):
    """
    转置卷积网络封装
    :param input_: 
    :param output_shape: 输出的shape
    :param k_h: 
    :param k_w: 
    :param s_h: 
    :param s_w: 
    :param stddev: 
    :param scope: 
    :param with_w: 
    :return: 
    """
    with tf.variable_scope(scope):
        # filter : [height, width, output_channels, in_channels]
        w = tf.get_variable('w', [k_h, k_w, output_shape[-1], input_.get_shape()[-1]],
                            initializer=tf.random_normal_initializer(stddev=stddev))

        try:
            deconv = tf.nn.conv2d_transpose(input_, w, output_shape=output_shape,
                                            strides=[1, s_h, s_w, 1])

        # Support for verisons of TensorFlow before 0.7.0
        except AttributeError:
            deconv = tf.nn.deconv2d(input_, w, output_shape=output_shape,
                                    strides=[1, s_h, s_w, 1])

        biases = tf.get_variable('biases', [output_shape[-1]], initializer=tf.constant_initializer(0.0))
        deconv = tf.reshape(tf.nn.bias_add(deconv, biases), deconv.get_shape())

        if with_w:
            return deconv, w, biases
        else:
            return deconv


def lrelu(x, leak=0.2):
    """
    Leak_Relu层封装
    :param x: 
    :param leak: 
    :return: 
    """
    return tf.maximum(x, leak*x)


def linear(input_, output_size,
           stddev=0.02, bias_start=0.0,
           scope=None, with_w=False):
    """
    全连接层封装
    :param input_: 
    :param output_size: 输出节点数目
    :param scope: 
    :param stddev: 
    :param bias_start: 使用常数初始化偏执,常数值设定
    :param with_w: 返回是否返回参数Variable
    :return: 
    """
    shape = input_.get_shape().as_list()

    with tf.variable_scope(scope or "Linear"):
        matrix = tf.get_variable("Matrix", [shape[1], output_size], tf.float32,
                                 tf.random_normal_initializer(stddev=stddev))
        bias = tf.get_variable("bias", [output_size], initializer=tf.constant_initializer(bias_start))

        if with_w:
            return tf.matmul(input_, matrix) + bias, matrix, bias
        else:
            return tf.matmul(input_, matrix) + bias