tensorflow 2.0 学习 ,八 keras模块的认识

# encoding :utf-8

import tensorflow as tf
from tensorflow import keras
# 导入常见网络层, sequential容器, 优化器, 损失函数
from tensorflow.keras import layers, Sequential, optimizers, losses, metrics
import os
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '2'
import datetime
import io
import matplotlib.pyplot as plt

# 1
"""
x = tf.constant([2., 1., 0.1])
layer = layers.Softmax(axis=-1)  # 创建softmax层
out = layer(x)  # out = tf.nn.softmax(x)
print(out)
"""


# 2 method one
"""
network = Sequential([
    layers.Dense(3, activation=None),
    layers.ReLU(),
    layers.Dense(2, activation=None),
    layers.ReLU()
])
x = tf.random.normal([4, 3])
out = network(x)
print(out)
"""
# 2 method two
"""
layers_num = 2  # 堆叠两次
network = Sequential([])  # 先创建空的网络容器
for _ in range(layers_num):
    network.add(layers.Dense(3))  # 添加全连接层
    network.add(layers.ReLU())
network.build(input_shape=(4, 4))  # 创建网络参数
network.summary()
for p in network.trainable_variables:
    print(p.name, p.shape)
"""


# 3 模型装配
# input data
path = r'G:\2019\python\mnist.npz'
(x_train, y_train), (x_test, y_test) = tf.keras.datasets.mnist.load_data(path)
x = tf.convert_to_tensor(x_train, dtype = tf.float32)/255.    #0:1  ;   -1:1(不适合训练,准确度不高)
x = tf.reshape(x, [-1, 28*28])
y = tf.convert_to_tensor(y_train, dtype=tf.int32)
y = tf.one_hot(y, depth=10)

train_db = tf.data.Dataset.from_tensor_slices((x, y))
train_db = train_db.shuffle(60000)      # 尽量与样本空间一样大
train_db = train_db.batch(100)          # 128


def preprocess(x, y):
    x = tf.cast(x, dtype=tf.float32) / 255.     #先将类型转化为float32,再归一到0-1
    x = tf.reshape(x, [-1, 28*28])              #不知道x数量,用-1代替,转化为一维784个数据
    y = tf.cast(y, dtype=tf.int32)              #转化为整型32
    y = tf.one_hot(y, depth=10)                 #训练数据所需的one-hot编码
    return x, y


# 将10000组测试数据预处理
test_db = tf.data.Dataset.from_tensor_slices((x_test, y_test))
test_db = test_db.shuffle(10000)
test_db = test_db.batch(100)        #128
val_db = test_db.map(preprocess)
""" # 用于4中调试用
network = Sequential([
    layers.Dense(256, activation='relu'),
    layers.Dense(128, activation='relu'),
    layers.Dense(64, activation='relu'),
    layers.Dense(32, activation='relu'),
    layers.Dense(10)
])
network.build(input_shape=(4, 28*28))
network.summary()
network.compile(optimizer=optimizers.Adam(lr=0.01),  # Adam优化器
                loss=losses.CategoricalCrossentropy(from_logits=True),  # 交叉熵损失函数
                metrics=['accuracy'])  # 设定指标为准确率

# 3 模型训练
# 训练5个epochs,每2个epochs验证一次 fit()代表网络的训练过程
history = network.fit(train_db, epochs=5, validation_data=val_db, validation_freq=2)
# history.history

# 3 模型测试
x, y = next(iter(val_db))  # 加载一个测试数据
print('predict x:', x.shape)  # 打印当前batch的形状
out = network.predict(x)  # 模型预测保存在out中
print(out)
# network.evaluate(val_db)  # 模型测试,性能表现

# 4 模型的保存
# method 1
# network.save_weights('weight.ckpt')
# print('saved weights.')
# del network
# method 2
# network.save('exam6_model.h5')
# print('saved total model.')
# del network
# method 3
tf.saved_model.save(network, 'exam6_model-savedmodel')
print('saving savedmodel.')
del network
"""
# 创建相同的网络 有网络源的情况下 method 1
"""
network = Sequential([
    layers.Dense(256, activation='relu'),
    layers.Dense(128, activation='relu'),
    layers.Dense(64, activation='relu'),
    layers.Dense(32, activation='relu'),
    layers.Dense(10)
])
network.build(input_shape=(4, 28*28))
network.summary()
network.compile(optimizer=optimizers.Adam(lr=0.01),  # Adam优化器
                loss=losses.CategoricalCrossentropy(from_logits=True),  # 交叉熵损失函数
                metrics=['accuracy'])  # 设定指标为准确率

# 从文件中读入参数数据到当前网络
network.load_weights('weights.ckpt')
print('loaded weights!')  # Failed to find any matching files for weights.ckpt(import os !)
"""

# 无网络源的情况 method 2
# network = keras.models.load_model('exam6_model.h5')
# network.summary()

# SaveModel方式 method 3
"""
print('load savedmodel from file.')
network = tf.saved_model.load('exam6_model-savedmodel')
acc_meter = metrics.CategoricalAccuracy()
for x, y in val_db:
    pred = network(x)
    acc_meter.update_state(y_true=y, y_pred=pred)
print("Test Accuracy:%f" % acc_meter.result())  # Test Accuracy:0.967000
"""

# 5 自定义网络层
"""

class MyDense(layers.Layer):
    def __init__(self, inp_dim, outp_dim):
        super(MyDense, self).__init__()
        # 创建权值张量并添加到管理列表中
        self.kernel = self.add_variable('W', [inp_dim, outp_dim], trainable=True)
# net = MyDense(4, 3)
# print(net.variables, net.trainable_variables)


def call(self, inputs, training=None):
    out = inputs@self.kernel
    out = tf.nn.relu(out)
    return out


network = Sequential([
    MyDense(784, 256),
    MyDense(256, 128),
    MyDense(128, 64),
    MyDense(64, 32),
    MyDense(32, 10)
])
network.build(input_shape=(None, 28*28))
network.summary()


class MyModel(keras.Model):
    def __init__(self):
        super(MyModel, self).__init__()
        self.fc1 = MyDense(28*28, 256)
        self.fc2 = MyDense(256, 128)
        self.fc3 = MyDense(128, 64)
        self.fc4 = MyDense(64, 32)
        self.fc5 = MyDense(32, 10)

    def call(self, inputs, training=None):
        x = self.fc1(inputs)
        x = self.fc2(x)
        x = self.fc3(x)
        x = self.fc4(x)
        x = self.fc5(x)
        return x
"""

# 6 模型乐园
"""
# 加载 ImageNet预训练模型,去掉最后一层
resnet = keras.applications.ResNet50(weights='imagenet', include_top=False)

# resnet.summary()
# x = tf.random.normal([4, 224, 224, 3])
# out = resnet(x)
# print(out)  # shape=(4, 7, 7, 2048)

# 新建池化层
global_average_layer = layers.GlobalAveragePooling2D()
# x = tf.random.normal([4, 7, 7, 2048])
# out = global_average_layer(x)
# print(out.shape)  # (4, 2048)

# 新建全连接层
fc = layers.Dense(100)
# x = tf.random.normal([4, 2048])
#out = fc(x)
#print(out.shape)  # (4, 100)

# 重新包裹网络模型
mynet = Sequential([resnet, global_average_layer, fc])
mynet.summary()
"""

# 7 准确率
network = Sequential([
    layers.Dense(256, activation='relu'),
    layers.Dense(128, activation='relu'),
    layers.Dense(64, activation='relu'),
    layers.Dense(32, activation='relu'),
    layers.Dense(10)
])
network.build(input_shape=(None, 28*28))
network.summary()
optimizer=optimizers.Adam(lr=0.01)
acc_meter = metrics.Accuracy()  # 创建准确率测量器
loss_meter = metrics.Mean()  # 新建平均测量器


for step, (x, y) in enumerate(train_db):    #遍历切分好的数据step:0->599
    with tf.GradientTape() as tape:
        out = network(x)
        loss = tf.reduce_mean(tf.losses.categorical_crossentropy(y, out, from_logits=True))
        loss_meter.update_state(float(loss))  # 写入数据

    grads = tape.gradient(loss, network.trainable_variables)
    optimizer.apply_gradients(zip(grads, network.trainable_variables))

    if step % 100 == 0:
        print(step, 'loss:', loss_meter.result().numpy())  # 读统计数据
        loss_meter.reset_states()  # 清零

    # 测试
    if step % 500 == 0:
        total, total_correct = 0., 0
        acc_meter.reset_states()
        for step, (x, y) in enumerate(val_db):
            out = network(x)
            correct = tf.equal(out, y)
            total_correct += tf.reduce_sum(tf.cast(correct, dtype=tf.int32)).numpy()
            total += x.shape[0]
            acc_meter.update_state(y, out)
        print(step, 'Evaluate Acc:', total_correct/total, acc_meter.result().numpy())

认识到keras用于神经网络学习的简便性!