使用卷积神经网络CNN完成验证码识别

gen_sample_by_captcha.py 生成验证码图片

# -*- coding: UTF-8 -*-
"""
使用captcha lib生成验证码(前提:pip install captcha)
"""
from captcha.image import ImageCaptcha
import os
import random
import time


def gen_special_img(text, file_path):
    # 生成img文件
    generator = ImageCaptcha(width=width, height=height)  # 指定大小
    img = generator.generate_image(text)  # 生成图片
    img.save(file_path)  # 保存图片


if __name__ == \'__main__\':
    # 配置参数
    root_dir = "../sample/origin/"  # 图片储存路径
    image_suffix = "png"  # 图片储存后缀
    characters = "0123456789"  # 图片上显示的字符集
    # characters = "0123456789abcdefghijklmnopqrstuvwxyz"
    count = 10000  # 生成多少张样本
    char_count = 4  # 图片上的字符数量

    # 设置图片高度和宽度
    width = 100
    height = 60

    # 判断文件夹是否存在
    if not os.path.exists(root_dir):
        os.mkdir(root_dir)

    for i in range(count):
        text = ""
        for j in range(char_count):
            text += random.choice(characters)
        timec = str(time.time()).replace(".", "")
        p = os.path.join(root_dir, "{}_{}.{}".format(text, timec, image_suffix))
        gen_special_img(text, p)

sample.py 配置文件

from easydict import EasyDict
import os
import json

# 可以使得以属性的方式去访问字典的值
sample_conf = EasyDict()

# 图片文件夹
sample_conf.origin_image_dir = "./sample/origin/"
sample_conf.train_image_dir = "./sample/train/"
sample_conf.test_image_dir = "./sample/test/"
sample_conf.api_image_dir = "./sample/api/"
sample_conf.online_image_dir = "./sample/online/"
sample_conf.local_image_dir = "./sample/local/"

# 模型文件夹
sample_conf.model_save_dir = "./model/"

# 图片相关参数
sample_conf.image_width = 100
sample_conf.image_height = 60
sample_conf.max_captcha = 4
sample_conf.image_suffix = "png"

# 验证码字符相关参数
sample_conf.char_set = [\'0\', \'1\', \'2\', \'3\', \'4\', \'5\', \'6\', \'7\', \'8\', \'9\', \'a\', \'b\', \'c\', \'d\', \'e\', \'f\', \'g\', \'h\', \'i\',
                        \'j\', \'k\', \'l\', \'m\', \'n\', \'o\', \'p\', \'q\', \'r\', \'s\', \'t\', \'u\', \'v\', \'w\', \'x\', \'y\', \'z\']
# char_set = [\'a\',\'b\',\'c\',\'d\',\'e\',\'f\',\'g\',\'h\',\'i\',\'j\',\'k\',\'l\',\'m\',\'n\',\'o\',\'p\',\'q\',\'r\',\'s\',\'t\',\'u\',\'v\',\'w\',\'x\',\'y\',\'z\']
# char_set = [\'A\',\'B\',\'C\',\'D\',\'E\',\'F\',\'G\',\'H\',\'I\',\'J\',\'K\',\'L\',\'M\',\'N\',\'O\',\'P\',\'Q\',\'R\',\'S\',\'T\',\'U\',\'V\',\'W\',\'X\',\'Y\',\'Z\']

use_labels_json_file = False
if use_labels_json_file:
    if os.path.exists("gen_image/labels.json"):
        with open("gen_image/labels.json", "r") as f:
            content = f.read()
            if content:
                sample_conf.char_set = json.loads(content)
            else:
                pass
    else:
        pass

sample_conf.remote_url = "https://www.xxxxx.com/getImg"

verify_and_split_data.py

"""
验证图片尺寸和划分测试集(5%)和训练集(95%)
"""
from PIL import Image
import random
import os
import shutil
from sample import sample_conf


def verify(origin_dir, real_width, real_height, image_suffix):
    """
    校验图片大小
    :return:
    """
    print("开始校验原始图片集")
    # 图片真实尺寸
    real_size = (real_width, real_height)
    # 图片名称列表和数量
    img_list = os.listdir(origin_dir)
    total_count = len(img_list)
    print("原始集共有图片: {}张".format(total_count))

    # 无效图片列表
    bad_img = []

    # 遍历所有图片进行验证
    for index, img_name in enumerate(img_list):
        file_path = os.path.join(origin_dir, img_name)
        # 过滤图片不正确的后缀
        if not img_name.endswith(image_suffix):
            bad_img.append((index, img_name, "文件后缀不正确"))
            continue

        # 过滤图片标签不标准的情况
        prefix, posfix = img_name.split("_")
        if prefix == "" or posfix == "":
            bad_img.append((index, img_name, "图片标签异常"))
            continue

        # 图片无法正常打开
        try:
            img = Image.open(file_path)
        except OSError:
            bad_img.append((index, img_name, "图片无法正常打开"))
            continue

        # 图片尺寸有异常
        if real_size == img.size:
            print("{} pass".format(index), end=\'\r\')
        else:
            bad_img.append((index, img_name, "图片尺寸异常为:{}".format(img.size)))

    print("====以下{}张图片有异常====".format(len(bad_img)))
    if bad_img:
        for b in bad_img:
            print("[第{}张图片] [{}] [{}]".format(b[0], b[1], b[2]))
    else:
        print("未发现异常(共 {} 张图片)".format(len(img_list)))
    print("========end\n")
    return bad_img


def split(origin_dir, train_dir, test_dir, bad_imgs):
    """
    分离训练集和测试集
    :return:
    """
    print("开始分离原始图片集为:测试集(5%)和训练集(95%)")

    # 图片名称列表和数量
    img_list = os.listdir(origin_dir)
    for img in bad_imgs:
        img_list.remove(img)
    total_count = len(img_list)
    print("共分配{}张图片到训练集和测试集,其中{}张为异常留在原始目录".format(total_count, len(bad_imgs)))

    # 创建文件夹
    if not os.path.exists(train_dir):
        os.mkdir(train_dir)

    if not os.path.exists(test_dir):
        os.mkdir(test_dir)

    # 测试集
    test_count = int(total_count * 0.05)
    test_set = set()
    for i in range(test_count):
        while True:
            file_name = random.choice(img_list)
            if file_name in test_set:
                pass
            else:
                test_set.add(file_name)
                img_list.remove(file_name)
                break

    test_list = list(test_set)
    print("测试集数量为:{}".format(len(test_list)))
    for file_name in test_list:
        src = os.path.join(origin_dir, file_name)
        dst = os.path.join(test_dir, file_name)
        shutil.move(src, dst)

    # 训练集
    train_list = img_list
    print("训练集数量为:{}".format(len(train_list)))
    for file_name in train_list:
        src = os.path.join(origin_dir, file_name)
        dst = os.path.join(train_dir, file_name)
        shutil.move(src, dst)

    if os.listdir(origin_dir) == 0:
        print("migration done")


def main():
    # 图片路径
    origin_dir = sample_conf["origin_image_dir"]
    train_dir = sample_conf["train_image_dir"]
    test_dir = sample_conf["test_image_dir"]
    # 图片尺寸
    real_width = sample_conf["image_width"]
    real_height = sample_conf["image_height"]
    # 图片后缀
    image_suffix = sample_conf["image_suffix"]

    bad_images_info = verify(origin_dir, real_width, real_height, image_suffix)
    bad_imgs = []
    for info in bad_images_info:
        bad_imgs.append(info[1])
    split(origin_dir, train_dir, test_dir, bad_imgs)


if __name__ == \'__main__\':
    main()

train_model_v2.py 训练模型,训练过程中同时输出训练集和验证集的准确率

# -*- coding: utf-8 -*-
import tensorflow as tf
import numpy as np
import matplotlib.pyplot as plt
import time
from PIL import Image
import random
import os
from sample import sample_conf
from tensorflow.python.framework.errors_impl import NotFoundError


# 设置以下环境变量可开启CPU识别
# os.environ["CUDA_DEVICE_ORDER"] = "PCI_BUS_ID"
# os.environ["CUDA_VISIBLE_DEVICES"] = "-1"

class TrainError(Exception):
    pass


class TrainModel(object):
    def __init__(self, train_img_path, verify_img_path, char_set, model_save_dir, verify=False):
        # 模型路径
        self.model_save_dir = model_save_dir

        # 打乱文件顺序+校验图片格式
        self.train_img_path = train_img_path
        self.train_images_list = os.listdir(train_img_path)

        # 校验格式
        if verify:
            self.confirm_image_suffix()

        # 打乱文件顺序
        random.seed(time.time())
        random.shuffle(self.train_images_list)

        # 验证集文件
        self.verify_img_path = verify_img_path
        self.verify_images_list = os.listdir(verify_img_path)

        # 获得图片宽高和字符长度基本信息
        label, captcha_array = self.gen_captcha_text_image(train_img_path, self.train_images_list[0])

        captcha_shape = captcha_array.shape
        captcha_shape_len = len(captcha_shape)
        if captcha_shape_len == 3:
            image_height, image_width, channel = captcha_shape
            self.channel = channel
        elif captcha_shape_len == 2:
            image_height, image_width = captcha_shape
        else:
            raise TrainError("图片转换为矩阵时出错,请检查图片格式")

        # 初始化变量
        # 图片尺寸
        self.image_height = image_height
        self.image_width = image_width
        # 验证码长度(位数)
        self.max_captcha = len(label)
        # 验证码字符类别
        self.char_set = char_set
        self.char_set_len = len(char_set)

        # 相关信息打印
        print("-->图片尺寸: {} X {}".format(image_height, image_width))
        print("-->验证码长度: {}".format(self.max_captcha))
        print("-->验证码共{}类 {}".format(self.char_set_len, char_set))
        print("-->使用测试集为 {}".format(train_img_path))
        print("-->使验证集为 {}".format(verify_img_path))

        # tf初始化占位符
        self.X = tf.placeholder(tf.float32, [None, image_height * image_width])  # 特征向量
        self.Y = tf.placeholder(tf.float32, [None, self.max_captcha * self.char_set_len])  # 标签
        self.keep_prob = tf.placeholder(tf.float32)  # dropout值
        self.w_alpha = 0.01
        self.b_alpha = 0.1

        # test model input and output
        print(">>> Start model test")
        batch_x, batch_y = self.get_batch(0, size=100)
        print(">>> input batch images shape: {}".format(batch_x.shape))
        print(">>> input batch labels shape: {}".format(batch_y.shape))

    @staticmethod
    def gen_captcha_text_image(img_path, img_name):
        """
        返回一个验证码的array形式和对应的字符串标签
        :return:tuple (str, numpy.array)
        """
        # 标签
        label = img_name.split("_")[0]
        # 文件
        img_file = os.path.join(img_path, img_name)
        captcha_image = Image.open(img_file)
        captcha_array = np.array(captcha_image)  # 向量化
        return label, captcha_array

    @staticmethod
    def convert2gray(img):
        """
        图片转为灰度图,如果是3通道图则计算,单通道图则直接返回
        :param img:
        :return:
        """
        if len(img.shape) > 2:
            r, g, b = img[:, :, 0], img[:, :, 1], img[:, :, 2]
            gray = 0.2989 * r + 0.5870 * g + 0.1140 * b
            return gray
        else:
            return img

    def text2vec(self, text):
        """
        转标签为oneHot编码
        :param text: str
        :return: numpy.array
        """
        text_len = len(text)
        if text_len > self.max_captcha:
            raise ValueError(\'验证码最长{}个字符\'.format(self.max_captcha))

        vector = np.zeros(self.max_captcha * self.char_set_len)

        for i, ch in enumerate(text):
            idx = i * self.char_set_len + self.char_set.index(ch)
            vector[idx] = 1
        return vector

    def get_batch(self, n, size=128):
        batch_x = np.zeros([size, self.image_height * self.image_width])  # 初始化
        batch_y = np.zeros([size, self.max_captcha * self.char_set_len])  # 初始化

        max_batch = int(len(self.train_images_list) / size)
        # print(max_batch)
        if max_batch - 1 < 0:
            raise TrainError("训练集图片数量需要大于每批次训练的图片数量")
        if n > max_batch - 1:
            n = n % max_batch
        s = n * size
        e = (n + 1) * size
        this_batch = self.train_images_list[s:e]
        # print("{}:{}".format(s, e))

        for i, img_name in enumerate(this_batch):
            label, image_array = self.gen_captcha_text_image(self.train_img_path, img_name)
            image_array = self.convert2gray(image_array)  # 灰度化图片
            batch_x[i, :] = image_array.flatten() / 255  # flatten 转为一维
            batch_y[i, :] = self.text2vec(label)  # 生成 oneHot
        return batch_x, batch_y

    def get_verify_batch(self, size=100):
        batch_x = np.zeros([size, self.image_height * self.image_width])  # 初始化
        batch_y = np.zeros([size, self.max_captcha * self.char_set_len])  # 初始化

        verify_images = []
        for i in range(size):
            verify_images.append(random.choice(self.verify_images_list))

        for i, img_name in enumerate(verify_images):
            label, image_array = self.gen_captcha_text_image(self.verify_img_path, img_name)
            image_array = self.convert2gray(image_array)  # 灰度化图片
            batch_x[i, :] = image_array.flatten() / 255  # flatten 转为一维
            batch_y[i, :] = self.text2vec(label)  # 生成 oneHot
        return batch_x, batch_y

    def confirm_image_suffix(self):
        # 在训练前校验所有文件格式
        print("开始校验所有图片后缀")
        for index, img_name in enumerate(self.train_images_list):
            print("{} image pass".format(index), end=\'\r\')
            if not img_name.endswith(sample_conf[\'image_suffix\']):
                raise TrainError(\'confirm images suffix:you request [.{}] file but get file [{}]\'
                                 .format(sample_conf[\'image_suffix\'], img_name))
        print("所有图片格式校验通过")

    def model(self):
        x = tf.reshape(self.X, shape=[-1, self.image_height, self.image_width, 1])
        print(">>> input x: {}".format(x))

        # 卷积层1
        wc1 = tf.get_variable(name=\'wc1\', shape=[3, 3, 1, 32], dtype=tf.float32,
                              initializer=tf.contrib.layers.xavier_initializer())
        bc1 = tf.Variable(self.b_alpha * tf.random_normal([32]))
        conv1 = tf.nn.relu(tf.nn.bias_add(tf.nn.conv2d(x, wc1, strides=[1, 1, 1, 1], padding=\'SAME\'), bc1))
        conv1 = tf.nn.max_pool(conv1, ksize=[1, 2, 2, 1], strides=[1, 2, 2, 1], padding=\'SAME\')
        conv1 = tf.nn.dropout(conv1, self.keep_prob)

        # 卷积层2
        wc2 = tf.get_variable(name=\'wc2\', shape=[3, 3, 32, 64], dtype=tf.float32,
                              initializer=tf.contrib.layers.xavier_initializer())
        bc2 = tf.Variable(self.b_alpha * tf.random_normal([64]))
        conv2 = tf.nn.relu(tf.nn.bias_add(tf.nn.conv2d(conv1, wc2, strides=[1, 1, 1, 1], padding=\'SAME\'), bc2))
        conv2 = tf.nn.max_pool(conv2, ksize=[1, 2, 2, 1], strides=[1, 2, 2, 1], padding=\'SAME\')
        conv2 = tf.nn.dropout(conv2, self.keep_prob)

        # 卷积层3
        wc3 = tf.get_variable(name=\'wc3\', shape=[3, 3, 64, 128], dtype=tf.float32,
                              initializer=tf.contrib.layers.xavier_initializer())
        bc3 = tf.Variable(self.b_alpha * tf.random_normal([128]))
        conv3 = tf.nn.relu(tf.nn.bias_add(tf.nn.conv2d(conv2, wc3, strides=[1, 1, 1, 1], padding=\'SAME\'), bc3))
        conv3 = tf.nn.max_pool(conv3, ksize=[1, 2, 2, 1], strides=[1, 2, 2, 1], padding=\'SAME\')
        conv3 = tf.nn.dropout(conv3, self.keep_prob)
        print(">>> convolution 3: ", conv3.shape)
        next_shape = conv3.shape[1] * conv3.shape[2] * conv3.shape[3]

        # 全连接层1
        wd1 = tf.get_variable(name=\'wd1\', shape=[next_shape, 1024], dtype=tf.float32,
                              initializer=tf.contrib.layers.xavier_initializer())
        bd1 = tf.Variable(self.b_alpha * tf.random_normal([1024]))
        dense = tf.reshape(conv3, [-1, wd1.get_shape().as_list()[0]])
        dense = tf.nn.relu(tf.add(tf.matmul(dense, wd1), bd1))
        dense = tf.nn.dropout(dense, self.keep_prob)

        # 全连接层2
        wout = tf.get_variable(\'name\', shape=[1024, self.max_captcha * self.char_set_len], dtype=tf.float32,
                               initializer=tf.contrib.layers.xavier_initializer())
        bout = tf.Variable(self.b_alpha * tf.random_normal([self.max_captcha * self.char_set_len]))
        y_predict = tf.add(tf.matmul(dense, wout), bout)
        return y_predict

    def train_cnn(self):
        y_predict = self.model()
        print(">>> input batch predict shape: {}".format(y_predict.shape))
        print(">>> End model test")

        # 计算概率 损失
        cost = tf.reduce_mean(tf.nn.sigmoid_cross_entropy_with_logits(logits=y_predict, labels=self.Y))

        # 梯度下降
        optimizer = tf.train.AdamOptimizer(learning_rate=0.0001).minimize(cost)

        # 计算准确率
        predict = tf.reshape(y_predict, [-1, self.max_captcha, self.char_set_len])  # 预测结果
        max_idx_p = tf.argmax(predict, 2)  # 预测结果
        max_idx_l = tf.argmax(tf.reshape(self.Y, [-1, self.max_captcha, self.char_set_len]), 2)  # 标签
        correct_pred = tf.equal(max_idx_p, max_idx_l)
        accuracy_char_count = tf.reduce_mean(tf.cast(correct_pred, tf.float32))
        accuracy_image_count = tf.reduce_mean(tf.reduce_min(tf.cast(correct_pred, tf.float32), axis=1))

        # 模型保存对象
        saver = tf.train.Saver()
        with tf.Session() as sess:
            init = tf.global_variables_initializer()
            sess.run(init)
            # 恢复模型
            if os.path.exists(self.model_save_dir):
                try:
                    saver.restore(sess, self.model_save_dir)
                # 判断捕获model文件夹中没有模型文件的错误
                except ValueError:
                    print("model文件夹为空,将创建新模型")
            else:
                pass
            step = 1
            for i in range(3000):
                batch_x, batch_y = self.get_batch(i, size=128)
                # 梯度下降训练
                _, cost_ = sess.run([optimizer, cost], feed_dict={self.X: batch_x, self.Y: batch_y, self.keep_prob: 0.75})
                if step % 10 == 0:
                    # 基于训练集的测试
                    batch_x_test, batch_y_test = self.get_batch(i, size=100)
                    acc_char = sess.run(accuracy_char_count, feed_dict={self.X: batch_x_test, self.Y: batch_y_test, self.keep_prob: 1.})
                    acc_image = sess.run(accuracy_image_count, feed_dict={self.X: batch_x_test, self.Y: batch_y_test, self.keep_prob: 1.})
                    print("第{}次训练 >>> ".format(step))
                    print("[训练集] 字符准确率为 {:.5f} 图片准确率为 {:.5f} >>> loss {:.10f}".format(acc_char, acc_image, cost_))

                    # 基于验证集的测试
                    batch_x_verify, batch_y_verify = self.get_verify_batch(size=100)
                    acc_char = sess.run(accuracy_char_count, feed_dict={self.X: batch_x_verify, self.Y: batch_y_verify, self.keep_prob: 1.})
                    acc_image = sess.run(accuracy_image_count, feed_dict={self.X: batch_x_verify, self.Y: batch_y_verify, self.keep_prob: 1.})
                    print("[验证集] 字符准确率为 {:.5f} 图片准确率为 {:.5f} >>> loss {:.10f}".format(acc_char, acc_image, cost_))

                    # 准确率达到99%后保存并停止
                    if acc_image > 0.99:
                        saver.save(sess, self.model_save_dir)
                        print("验证集准确率达到99%,保存模型成功")
                        break

                # 每训练500轮就保存一次
                if i % 500 == 0:
                    saver.save(sess, self.model_save_dir)
                    print("定时保存模型成功")
                step += 1
            saver.save(sess, self.model_save_dir)


def main():
    train_image_dir = sample_conf["train_image_dir"]
    verify_image_dir = sample_conf["test_image_dir"]
    char_set = sample_conf["char_set"]
    model_save_dir = sample_conf["model_save_dir"]
    tm = TrainModel(train_image_dir, verify_image_dir, char_set, model_save_dir, verify=False)
    tm.train_cnn()  # 开始训练模型


if __name__ == \'__main__\':
    main()

训练结果

第2960次训练 >>> 
[训练集] 字符准确率为 0.87500 图片准确率为 0.61000 >>> loss 0.0337208398
[验证集] 字符准确率为 0.81500 图片准确率为 0.45000 >>> loss 0.0337208398
第2970次训练 >>> 
[训练集] 字符准确率为 0.88500 图片准确率为 0.62000 >>> loss 0.0343154743
[验证集] 字符准确率为 0.80750 图片准确率为 0.39000 >>> loss 0.0343154743
第2980次训练 >>> 
[训练集] 字符准确率为 0.89250 图片准确率为 0.65000 >>> loss 0.0298477933
[验证集] 字符准确率为 0.80000 图片准确率为 0.38000 >>> loss 0.0298477933
第2990次训练 >>> 
[训练集] 字符准确率为 0.90250 图片准确率为 0.71000 >>> loss 0.0316790938
[验证集] 字符准确率为 0.83500 图片准确率为 0.48000 >>> loss 0.0316790938
第3000次训练 >>> 
[训练集] 字符准确率为 0.89000 图片准确率为 0.69000 >>> loss 0.0330378339
[验证集] 字符准确率为 0.83750 图片准确率为 0.53000 >>> loss 0.0330378339

test_batch.py 批量验证

# -*- coding: utf-8 -*-
import tensorflow as tf
import numpy as np
import time
from PIL import Image
import random
import os
from sample import sample_conf


class TestError(Exception):
    pass


class TestBatch(object):

    def __init__(self, img_path, char_set, model_save_dir, total):
        # 模型路径
        self.model_save_dir = model_save_dir
        # 打乱文件顺序
        self.img_path = img_path
        self.img_list = os.listdir(img_path)
        random.seed(time.time())
        random.shuffle(self.img_list)

        # 获得图片宽高和字符长度基本信息
        label, captcha_array = self.gen_captcha_text_image()

        captcha_shape = captcha_array.shape
        captcha_shape_len = len(captcha_shape)
        if captcha_shape_len == 3:
            image_height, image_width, channel = captcha_shape
            self.channel = channel
        elif captcha_shape_len == 2:
            image_height, image_width = captcha_shape
        else:
            raise TestError("图片转换为矩阵时出错,请检查图片格式")

        # 初始化变量
        # 图片尺寸
        self.image_height = image_height
        self.image_width = image_width
        # 验证码长度(位数)
        self.max_captcha = len(label)
        # 验证码字符类别
        self.char_set = char_set
        self.char_set_len = len(char_set)
        # 测试个数
        self.total = total

        # 相关信息打印
        print("-->图片尺寸: {} X {}".format(image_height, image_width))
        print("-->验证码长度: {}".format(self.max_captcha))
        print("-->验证码共{}类 {}".format(self.char_set_len, char_set))
        print("-->使用测试集为 {}".format(img_path))

        # tf初始化占位符
        self.X = tf.placeholder(tf.float32, [None, image_height * image_width])  # 特征向量
        self.Y = tf.placeholder(tf.float32, [None, self.max_captcha * self.char_set_len])  # 标签
        self.keep_prob = tf.placeholder(tf.float32)  # dropout值
        self.w_alpha = 0.01
        self.b_alpha = 0.1

    def gen_captcha_text_image(self):
        """
        返回一个验证码的array形式和对应的字符串标签
        :return:tuple (str, numpy.array)
        """
        img_name = random.choice(self.img_list)
        # 标签
        label = img_name.split("_")[0]
        # 文件
        img_file = os.path.join(self.img_path, img_name)
        captcha_image = Image.open(img_file)
        captcha_array = np.array(captcha_image)  # 向量化

        return label, captcha_array

    @staticmethod
    def convert2gray(img):
        """
        图片转为灰度图,如果是3通道图则计算,单通道图则直接返回
        :param img:
        :return:
        """
        if len(img.shape) > 2:
            r, g, b = img[:, :, 0], img[:, :, 1], img[:, :, 2]
            gray = 0.2989 * r + 0.5870 * g + 0.1140 * b
            return gray
        else:
            return img

    def text2vec(self, text):
        """
        转标签为oneHot编码
        :param text: str
        :return: numpy.array
        """
        text_len = len(text)
        if text_len > self.max_captcha:
            raise ValueError(\'验证码最长{}个字符\'.format(self.max_captcha))

        vector = np.zeros(self.max_captcha * self.char_set_len)

        for i, ch in enumerate(text):
            idx = i * self.char_set_len + self.char_set.index(ch)
            vector[idx] = 1
        return vector

    def model(self):
        x = tf.reshape(self.X, shape=[-1, self.image_height, self.image_width, 1])
        print(">>> input x: {}".format(x))

        # 卷积层1
        wc1 = tf.get_variable(name=\'wc1\', shape=[3, 3, 1, 32], dtype=tf.float32,
                              initializer=tf.contrib.layers.xavier_initializer())
        bc1 = tf.Variable(self.b_alpha * tf.random_normal([32]))
        conv1 = tf.nn.relu(tf.nn.bias_add(tf.nn.conv2d(x, wc1, strides=[1, 1, 1, 1], padding=\'SAME\'), bc1))
        conv1 = tf.nn.max_pool(conv1, ksize=[1, 2, 2, 1], strides=[1, 2, 2, 1], padding=\'SAME\')
        conv1 = tf.nn.dropout(conv1, self.keep_prob)

        # 卷积层2
        wc2 = tf.get_variable(name=\'wc2\', shape=[3, 3, 32, 64], dtype=tf.float32,
                              initializer=tf.contrib.layers.xavier_initializer())
        bc2 = tf.Variable(self.b_alpha * tf.random_normal([64]))
        conv2 = tf.nn.relu(tf.nn.bias_add(tf.nn.conv2d(conv1, wc2, strides=[1, 1, 1, 1], padding=\'SAME\'), bc2))
        conv2 = tf.nn.max_pool(conv2, ksize=[1, 2, 2, 1], strides=[1, 2, 2, 1], padding=\'SAME\')
        conv2 = tf.nn.dropout(conv2, self.keep_prob)

        # 卷积层3
        wc3 = tf.get_variable(name=\'wc3\', shape=[3, 3, 64, 128], dtype=tf.float32,
                              initializer=tf.contrib.layers.xavier_initializer())
        bc3 = tf.Variable(self.b_alpha * tf.random_normal([128]))
        conv3 = tf.nn.relu(tf.nn.bias_add(tf.nn.conv2d(conv2, wc3, strides=[1, 1, 1, 1], padding=\'SAME\'), bc3))
        conv3 = tf.nn.max_pool(conv3, ksize=[1, 2, 2, 1], strides=[1, 2, 2, 1], padding=\'SAME\')
        conv3 = tf.nn.dropout(conv3, self.keep_prob)
        print(">>> convolution 3: ", conv3.shape)
        next_shape = conv3.shape[1]*conv3.shape[2]*conv3.shape[3]

        # 全连接层1
        wd1 = tf.get_variable(name=\'wd1\', shape=[next_shape, 1024], dtype=tf.float32,
                              initializer=tf.contrib.layers.xavier_initializer())
        bd1 = tf.Variable(self.b_alpha * tf.random_normal([1024]))
        dense = tf.reshape(conv3, [-1, wd1.get_shape().as_list()[0]])
        dense = tf.nn.relu(tf.add(tf.matmul(dense, wd1), bd1))
        dense = tf.nn.dropout(dense, self.keep_prob)

        # 全连接层2
        wout = tf.get_variable(\'name\', shape=[1024, self.max_captcha * self.char_set_len], dtype=tf.float32,
                               initializer=tf.contrib.layers.xavier_initializer())
        bout = tf.Variable(self.b_alpha * tf.random_normal([self.max_captcha * self.char_set_len]))
        y_predict = tf.add(tf.matmul(dense, wout), bout)
        return y_predict

    def test_batch(self):
        y_predict = self.model()
        total = self.total
        right = 0

        saver = tf.train.Saver()
        with tf.Session() as sess:
            saver.restore(sess, self.model_save_dir)
            s = time.time()
            for i in range(total):
                # test_text, test_image = gen_special_num_image(i)
                test_text, test_image = self.gen_captcha_text_image()  # 随机
                test_image = self.convert2gray(test_image)
                test_image = test_image.flatten() / 255

                predict = tf.argmax(tf.reshape(y_predict, [-1, self.max_captcha, self.char_set_len]), 2)
                text_list = sess.run(predict, feed_dict={self.X: [test_image], self.keep_prob: 1.})
                predict_text = text_list[0].tolist()
                p_text = ""
                for p in predict_text:
                    p_text += str(self.char_set[p])
                print("origin: {} predict: {}".format(test_text, p_text))
                if test_text == p_text:
                    right += 1
                else:
                    pass
            e = time.time()
        rate = str(right/total) + "%"
        print("测试结果: {}/{}".format(right, total))
        print("{}个样本识别耗时{}秒,准确率{}".format(total, e-s, rate))


def main():
    test_image_dir = sample_conf["test_image_dir"]
    model_save_dir = sample_conf["model_save_dir"]
    char_set = sample_conf["char_set"]
    total = 100
    tb = TestBatch(test_image_dir, char_set, model_save_dir, total)
    tb.test_batch()


if __name__ == \'__main__\':
    main()

程序结果

origin: 4958 predict: 4958
origin: 0409 predict: 0409
origin: 1328 predict: 1228
origin: 6181 predict: 6181
origin: 7017 predict: 7002
origin: 5355 predict: 5355
origin: 1780 predict: 7180
origin: 4122 predict: 4122
测试结果: 46/100
100个样本识别耗时3.113262891769409秒,准确率0.46%

封装识别类

# -*- coding: utf-8 -*-
"""
识别图像的类,为了快速进行多次识别可以调用此类下面的方法:
R = Recognizer(image_height, image_width, max_captcha)
for i in range(10):
    r_img = Image.open(str(i) + ".jpg")
    t = R.rec_image(r_img)
简单的图片每张基本上可以达到毫秒级的识别速度
"""
import tensorflow as tf
import numpy as np
from PIL import Image
from sample import sample_conf


class Recognizer(object):
    def __init__(self, image_height, image_width, max_captcha, char_set, model_save_dir):
        self.w_alpha = 0.01
        self.b_alpha = 0.1
        self.image_height = image_height
        self.image_width = image_width
        self.max_captcha = max_captcha
        self.char_set = char_set
        self.char_set_len = len(self.char_set)
        self.model_save_dir = model_save_dir

        # 新建图和会话
        self.g = tf.Graph()
        self.sess = tf.Session(graph=self.g)
        # 使用指定的图和会话
        with self.g.as_default():
            # 迭代循环前,写出所有用到的张量的计算表达式,如果写在循环中,会发生内存泄漏,拖慢识别的速度
            # tf初始化占位符
            self.X = tf.placeholder(tf.float32, [None, self.image_height * self.image_width])  # 特征向量
            self.Y = tf.placeholder(tf.float32, [None, self.max_captcha * self.char_set_len])  # 标签
            self.keep_prob = tf.placeholder(tf.float32)  # dropout值
            # 加载网络和模型参数
            self.y_predict = self.model()
            self.predict = tf.argmax(tf.reshape(self.y_predict, [-1, self.max_captcha, self.char_set_len]), 2)
            saver = tf.train.Saver()
            with self.sess.as_default() as sess:
                saver.restore(sess, self.model_save_dir)

    # def __del__(self):
    #     self.sess.close()
    #     print("session close")

    @staticmethod
    def convert2gray(img):
        """
        图片转为灰度图,如果是3通道图则计算,单通道图则直接返回
        :param img:
        :return:
        """
        if len(img.shape) > 2:
            r, g, b = img[:, :, 0], img[:, :, 1], img[:, :, 2]
            gray = 0.2989 * r + 0.5870 * g + 0.1140 * b
            return gray
        else:
            return img

    def text2vec(self, text):
        """
        转标签为oneHot编码
        :param text: str
        :return: numpy.array
        """
        text_len = len(text)
        if text_len > self.max_captcha:
            raise ValueError(\'验证码最长{}个字符\'.format(self.max_captcha))

        vector = np.zeros(self.max_captcha * self.char_set_len)

        for i, ch in enumerate(text):
            idx = i * self.char_set_len + self.char_set.index(ch)
            vector[idx] = 1
        return vector

    def model(self):
        x = tf.reshape(self.X, shape=[-1, self.image_height, self.image_width, 1])
        print(">>> input x: {}".format(x))

        # 卷积层1
        wc1 = tf.get_variable(name=\'wc1\', shape=[3, 3, 1, 32], dtype=tf.float32,
                              initializer=tf.contrib.layers.xavier_initializer())
        bc1 = tf.Variable(self.b_alpha * tf.random_normal([32]))
        conv1 = tf.nn.relu(tf.nn.bias_add(tf.nn.conv2d(x, wc1, strides=[1, 1, 1, 1], padding=\'SAME\'), bc1))
        conv1 = tf.nn.max_pool(conv1, ksize=[1, 2, 2, 1], strides=[1, 2, 2, 1], padding=\'SAME\')
        conv1 = tf.nn.dropout(conv1, self.keep_prob)

        # 卷积层2
        wc2 = tf.get_variable(name=\'wc2\', shape=[3, 3, 32, 64], dtype=tf.float32,
                              initializer=tf.contrib.layers.xavier_initializer())
        bc2 = tf.Variable(self.b_alpha * tf.random_normal([64]))
        conv2 = tf.nn.relu(tf.nn.bias_add(tf.nn.conv2d(conv1, wc2, strides=[1, 1, 1, 1], padding=\'SAME\'), bc2))
        conv2 = tf.nn.max_pool(conv2, ksize=[1, 2, 2, 1], strides=[1, 2, 2, 1], padding=\'SAME\')
        conv2 = tf.nn.dropout(conv2, self.keep_prob)

        # 卷积层3
        wc3 = tf.get_variable(name=\'wc3\', shape=[3, 3, 64, 128], dtype=tf.float32,
                              initializer=tf.contrib.layers.xavier_initializer())
        bc3 = tf.Variable(self.b_alpha * tf.random_normal([128]))
        conv3 = tf.nn.relu(tf.nn.bias_add(tf.nn.conv2d(conv2, wc3, strides=[1, 1, 1, 1], padding=\'SAME\'), bc3))
        conv3 = tf.nn.max_pool(conv3, ksize=[1, 2, 2, 1], strides=[1, 2, 2, 1], padding=\'SAME\')
        conv3 = tf.nn.dropout(conv3, self.keep_prob)
        print(">>> convolution 3: ", conv3.shape)
        next_shape = conv3.shape[1] * conv3.shape[2] * conv3.shape[3]

        # 全连接层1
        wd1 = tf.get_variable(name=\'wd1\', shape=[next_shape, 1024], dtype=tf.float32,
                              initializer=tf.contrib.layers.xavier_initializer())
        bd1 = tf.Variable(self.b_alpha * tf.random_normal([1024]))
        dense = tf.reshape(conv3, [-1, wd1.get_shape().as_list()[0]])
        dense = tf.nn.relu(tf.add(tf.matmul(dense, wd1), bd1))
        dense = tf.nn.dropout(dense, self.keep_prob)

        # 全连接层2
        wout = tf.get_variable(\'name\', shape=[1024, self.max_captcha * self.char_set_len], dtype=tf.float32,
                               initializer=tf.contrib.layers.xavier_initializer())
        bout = tf.Variable(self.b_alpha * tf.random_normal([self.max_captcha * self.char_set_len]))
        y_predict = tf.add(tf.matmul(dense, wout), bout)
        return y_predict

    def rec_image(self, img):
        # 读取图片
        img_array = np.array(img)
        test_image = self.convert2gray(img_array)
        test_image = test_image.flatten() / 255
        # 使用指定的图和会话
        with self.g.as_default():
            with self.sess.as_default() as sess:
                text_list = sess.run(self.predict, feed_dict={self.X: [test_image], self.keep_prob: 1.})

        # 获取结果
        predict_text = text_list[0].tolist()
        p_text = ""
        for p in predict_text:
            p_text += str(self.char_set[p])

        # 返回识别结果
        return p_text


def main():
    image_height = sample_conf["image_height"]
    image_width = sample_conf["image_width"]
    max_captcha = sample_conf["max_captcha"]
    char_set = sample_conf["char_set"]
    model_save_dir = sample_conf["model_save_dir"]
    R = Recognizer(image_height, image_width, max_captcha, char_set, model_save_dir)
    r_img = Image.open("./sample/test/0059_15553933348531582.png")
    t = R.rec_image(r_img)
    print(t)


if __name__ == \'__main__\':
    main()

使用flask写的提供在线识别功能的接口

# -*- coding: UTF-8 -*-
"""
构建flask接口服务
接收 files={\'image_file\': (\'captcha.jpg\', BytesIO(bytes), \'application\')} 参数识别验证码
需要配置参数:
    image_height = 40
    image_width = 80
    max_captcha = 4
"""
import json
from io import BytesIO
import os
from recognition_object import Recognizer

import time
from flask import Flask, request, jsonify, Response
from PIL import Image
from sample import sample_conf

# 默认使用CPU
os.environ["CUDA_DEVICE_ORDER"] = "PCI_BUS_ID"
os.environ["CUDA_VISIBLE_DEVICES"] = "-1"

# 配置参数
image_height = sample_conf["image_height"]
image_width = sample_conf["image_width"]
max_captcha = sample_conf["max_captcha"]
api_image_dir = sample_conf["api_image_dir"]
model_save_dir = sample_conf["model_save_dir"]
image_suffix = sample_conf["image_suffix"]  # 文件后缀
char_set = sample_conf["char_set"]

# Flask对象
app = Flask(__name__)
basedir = os.path.abspath(os.path.dirname(__file__))

# 生成识别对象,需要配置参数
R = Recognizer(image_height, image_width, max_captcha, char_set, model_save_dir)

# 如果你需要使用多个模型,可以参照原有的例子配置路由和编写逻辑
# Q = Recognizer(image_height, image_width, max_captcha, char_set, model_save_dir)


def response_headers(content):
    resp = Response(content)
    resp.headers[\'Access-Control-Allow-Origin\'] = \'*\'
    return resp


@app.route(\'/b\', methods=[\'POST\'])
def up_image():
    if request.method == \'POST\' and request.files.get(\'image_file\'):
        timec = str(time.time()).replace(".", "")
        file = request.files.get(\'image_file\')
        img = file.read()
        img = BytesIO(img)
        img = Image.open(img, mode="r")
        # username = request.form.get("name")
        print("接收图片尺寸: {}".format(img.size))
        s = time.time()
        value = R.rec_image(img)
        e = time.time()
        print("识别结果: {}".format(value))
        # 保存图片
        print("保存图片: {}{}_{}.{}".format(api_image_dir, value, timec, image_suffix))
        file_name = "{}_{}.{}".format(value, timec, image_suffix)
        file_path = os.path.join(api_image_dir + file_name)
        img.save(file_path)
        result = {
            \'time\': timec,   # 时间戳
            \'value\': value,  # 预测的结果
            \'speed_time(ms)\': int((e - s) * 1000)  # 识别耗费的时间
        }
        img.close()
        return jsonify(result)
    else:
        content = json.dumps({"error_code": "1001"})
        resp = response_headers(content)
        return resp


if __name__ == \'__main__\':
    app.run(debug=True, port=9999)

为了测试这个接口是否工作正常,还得写一个页面

<form  method="post" action=" http://127.0.0.1:9999/b" enctype="multipart/form-data">
    <div>
        <input  type="file" name="image_file"/>
        <input type="submit">提交</input>
    </div>
</form>

测试结果

{
  "speed_time(ms)": 13, 
  "time": "15553999504148507", 
  "value": "0069"
}