python selenium结合PhantomJS对ajax异步页面进行压测或者爬虫

  本人的程序是在mac上写的,windows的话可能略有不同主要是PhantomJS的路径上。首先要下载PhantomJS,然后创建一个到/usr/bin/phantomsjs的软链。为什么用selenium和PhantomJS是因为,公司是做电商的,页面很多都是ajax异步渲染出来的,使用urllib或者requests是无法渲染异步页面的,而PhantomJS是一个没有界面的浏览器,使用webkit浏览器内核(号称Safari也是用这个内核)。可以完全模拟渲染和点击拖动等动作。下面是我的主要代码,涉及到一些用户密码我都去掉了,代码就放在下面,其实也不难看懂,从登录,商品页面,下单,最后交易等。可以根据自己的需求添加,好像selenium是有自带的页面检查机制的,我找了下没找到就自己写了个,主要就是那个 while wait_browser。

# -*- coding: utf-8 -*-
from selenium import webdriver
import time
import re
from monitor_business import business
import requests
import random
from user_config import username_info
from selenium.common.exceptions import NoSuchElementException

class autocreateorder():
    def __init__(self,_username,_loginurl):
        self.username=_username
        self.loginurl=_loginurl
    def login(self):
        try:
            self.browser = webdriver.PhantomJS(desired_capabilities={'phantomjs.page.settings.resourceTimeout': '15000'})
        except Exception:
            print "PhantomJS init fail"
        self.browser.get(self.loginurl)
        try:
            #print "正在登录"
            browserlogin = self.browser.find_element_by_class_name("j_login-form")
            browserusername = self.browser.find_element_by_id("username-input")
            browserusername.click()
            browserusername.clear()
            browserusername.send_keys(self.username)
            browserpassword = self.browser.find_element_by_id("password-input")
            browserpassword.click()
            browserpassword.clear()
            browserpassword.send_keys(u'xxxxx')
            browserlogin.submit()
            #print "登录成功"
        except NoSuchElementException:
            print "PhantomJS login fail"
            return "fail"
    def productaddcart(self,_producturlurl,timeout,jscmd=None):
        if _producturlurl:
            self.browser.get(_producturlurl)
            print "open product url"
            time.sleep(timeout)
            self.browser.execute_script(jscmd)
            n=0
            wait_browser=True
            while wait_browser:
                try:
                    self.browser.find_element_by_class_name("joy-ui-dialog-alert-window")
                    wait_browser=False
                    #print "stop product url check"
                except NoSuchElementException:
                    if n == 11:
                        print "add cart timeout"
                        self.browser.quit()
                        return "showproductfail"
                    else:
                        #print "wait time to check product url"
                        time.sleep(1)
                        n=n+1                      
                    
        else:
            print "please config product url"
    def showcart(self):
        self.browser.get("http://shop.m.xxxxx.net/shopCart.html")            
        #browser.get_screenshot_as_file("/Users/zhangsongbin/Desktop/screenshot1.png")
        print "show cart"
        n=0
        wait_browser=True
        while wait_browser:
            try:
                self.browser.find_element_by_class_name("order-submit")
                wait_browser=False
            except NoSuchElementException:
                if n == 11:
                    print "show cart timeout"
                    self.browser.quit()
                    return "showcartfail"
                else:
                    time.sleep(1)
                    n=n+1
    def createorder(self):
        jscmd="$('.j_Submit').trigger('click');"
        self.browser.execute_script(jscmd)   
        #browser.get_screenshot_as_file("/Users/zhangsongbin/Desktop/screenshot1.png")
        print "click cart make order"
        n=0
        wait_browser=True
        while wait_browser:
            try:
                self.browser.find_element_by_class_name("j_Submit")
                wait_browser=False
            except NoSuchElementException:
                if n == 11:
                    print "confirm order"
                    self.browser.quit()
                    return "createorderfail"
                else:
                    time.sleep(1)
                    n=n+1
        #browser.get_screenshot_as_file("/Users/zhangsongbin/Desktop/确认订单.png")  
        time.sleep(5)   
        jscmd="$('.j_Submit').trigger('click');"
        self.browser.execute_script(jscmd)
    def checkcommission(self):
        wait_browser=True
        n=0
        while wait_browser:
            check_order_number=re.findall('<input type="hidden" value="(\d+)" name="orderNumber" class="j_OrderNumber">',self.browser.page_source)
            if check_order_number:
                wait_browser=False
                order_number=check_order_number[0]
                self.browser.quit()
                requests.get("http://shop.m.xxxxx.net/Fake_Pay?orderNumber="+str(order_number))
                time.sleep(30)
                check_commission=business.monitor_commission(order_number)#订单的状态检查。
                if check_commission == "order_fail":
                    print str(order_number)+":commission missing"
                else:
                    print str(order_number)+":commission ok"
            elif n == 11:
                print "pay url timeout"
                self.browser.quit()
                return "payfail"
            else:
                time.sleep(1)
                n=n+1
                
                
def makeorder():
    stratint=0
    endint=len(username_info)
    inputusername=username_info[random.randrange(stratint,endint)]#我在其他文件中有个用户名的集合,这里是随机调用一个用户出来登录
    order=autocreateorder(inputusername,"http://login.xxxxx.net/?service=http%3A%2F%2Fm.xxxxx.net&redirect_uri=%2Fm%2F")
    order.login()
    order.productaddcart("http://shop.m.xxxxx.net/shop/sku/18613.html",10,jscmd="$('.j_AddCart').trigger('click');")
    order.showcart()
    order.createorder()
    order.checkcommission()
def makemutilorder():
    stratint=0
    endint=len(username_info)
    inputusername=username_info[random.randrange(stratint,endint)]
    order=autocreateorder(inputusername,"http://login.xxxxx.net/?service=http%3A%2F%2Fm.xxxxx.net&redirect_uri=%2Fm%2F")
    order.login()
    productlist=["http://shop.m.xxxxx.net/shop/sku/18613.html","http://shop.m.xxxxx.net/shop/sku/146991.html","http://shop.m.xxxxx.net/shop/sku/149104.html"]
    for url in productlist:
        productpage=order.productaddcart(url,15,jscmd="$('.j_AddCart').trigger('click');")
        if productpage == "showproductfail":
            print "showproductfail"
    order.showcart()
    order.createorder()
    order.checkcommission()

下面在放一个调用上面这段代码的程序,可以用来做压测,前提是找台好点的linux服务器,因为这个不是html的爬虫,涉及到完全真实的渲染(当然了,如果没有异步的玩意,谁愿意用这个啊,压测请求接口不是更好?,除了测试人员)

# -*- coding: utf-8 -*-

import time
import threading
import multiprocessing
from webkit_ghost_test import makeorder,makemutilorder
    
def performancetest(_groups,_interval,_members):
    _groups=int(_groups)
    while _groups > 0:
        for i in range(_members):#一组下单多少次
            make = multiprocessing.Process(target=makemutilorder)
            make.start()
        #for n in tsk:
            #n.join()
        #print "组间下单:倒叙"+str(_groups)+"次"
        _groups=_groups-1
        time.sleep(_interval)

if __name__ == '__main__':
    url="http://m.xxxxx.net"
    ordertimes=float(10) #规定时间内下单次数,后面要有小数
    times=float(1) #以秒为单位规定测试时长,后面要有小数
    groups=float(1) #组内单位是同时下单,组间单位是间隔下单
    members=int(ordertimes/groups)#组内下单数量
    interval=times/groups #组间间隔时间
    print groups,interval,members
    time_wait = threading.Event() 
    performancetest(groups,interval,members)