PHP 真正多线程的使用

PHP 5.3 以上版本,使用pthreads PHP扩展,可以使PHP真正地支持多线程。多线程在处理重复性的循环任务,能够大大缩短程序执行时间。

  我之前的文章中说过,大多数网站的性能瓶颈不在PHP服务器上,因为它可以简单地通过横向增加服务器或CPU核数来轻松应对(对于各种云主机,增加VPS或CPU核数就更方便了,直接以备份镜像增加VPS,连操作系统、环境都不用安装配置),而是在于MySQL数据库。如果用 MySQL 数据库,一条联合查询的SQL,也许就可以处理完业务逻辑,但是,遇到大量并发请求,就歇菜了。如果用 NoSQL 数据库,也许需要十次查询,才能处理完同样地业务逻辑,但每次查询都比 MySQL 要快,十次循环NoSQL查询也许比一次MySQL联合查询更快,应对几万次/秒的查询完全没问题。如果加上PHP多线程,通过十个线程同时查询NoSQL,返回结果汇总输出,速度就要更快了。我们实际的APP产品中,调用一个通过用户喜好实时推荐商品的PHP接口,PHP需要对BigSea NoSQL数据库发起500~1000次查询,来实时算出用户的个性喜好商品数据,PHP多线程的作用非常明显。

  PHP扩展下载:https://github.com/krakjoe/pthreads

  PHP手册文档:http://php.net/manual/zh/book.pthreads.php

pthreads 是一组允许用户在 PHP 中使用多线程技术的面向对象的 API。 它提供了创建多线程应用所需的全套工具,无论是 Web 应用还是控制台应用。 通过使用 Thread, Worker 以及 Threaded 对象,PHP 应用可以创建、读取、写入以及执行多线程应用,并可以在多个线程之间进行同步控制。

Threaded 对象: Threaded 对象提供支持 pthreads 操作的基本功能,包括同步方法以及其他对程序员很有帮助的接口。

Thread 对象: 通过继承 pthreads 中提供的 Thread 对象并实现 run 方法,用户可以创建自己的 Thread 对象。 只要线程上下文中持有某个 Thread 对象的引用,就可以读/写该对象的属性,也可以调用该对象的公有(public)或者受保护(protected)的方法。 当在创建 Thread 对象的进程或线程上下文中调用该对象的 start 方法时,pthreads 扩展会在另外的独立线程中执行该对象的 run 方法。 仅有创建 Thread 对象的线程/进程方可开始(start)或者加入(join)这个 Thread 对象。

Worker 对象: Worker 是有状态的线程对象,它在线程开始之后就可用,除非代码中显式地关闭线程,否则该对象在线程对象超出作用范围之后才会失效。 持有 Worker 对象引用的线程上下文可以向 Worker 中入栈其他线程对象,Worker 对象将在独立线程中执行入栈对象的代码。 Woker 对象的 run 方法会在它的栈中入栈对象之前执行,这样就可以进行一些必需的资源初始化工作。

Pool 对象: Pool 对象是 Worker 线程对象池,可以用来在多个 Worker 对象之间分发 Threaded 对象,它可以很好的处理对象应用。 Pool 对象从 1.0.0 版本开始引入,现在已经成为最易用且高效多线程编程方式。

Caution

Pool 是标准 PHP 对象,所以不可以在多个线程上下文中共享同一个 Pool 对象。

线程间同步: 由 pthreads 扩展创建的所有对象拥有内置的线程间同步机制,和 Java 语言很类似,使用 ::wait 和 :: notify 方法。 调用某一个对象的 ::wait 方法会导致当前线程上下文进入等待状态,等待另外一个线程上下文调用同一个对象的 ::notify 方法。 为 PHP Threaded 对象提供了强有力的线程间同步控制机制。

Caution

应用中会用在多线程场景中的对象都应该从 Threaded 类继承。

方法修饰符: Threaded 对象中的受保护方法(protected)是被 pthreads 保护的,也就是说,在同一时间,只有一个线程可以访问该方法。 在执行过程中,私有方法(private)只能被该对象本身调用。

数据存储: 一般来说,任何可以序列化的数据类型都可以作为 Threaded 对象的属性,它可以从持有该对象引用的任何线程上下文读/写。 并不是所有的数据都采用序列化方式存储,比如基本类型就是以其真实形态存储的。 对于不是 Threaded 派生的对象,例如复杂类型、数组以及对象等,都是序列化存储的,可以从持有 Threaded 对象引用的任何线程上下文中读取和写入, 区别就在于对于 Threaed 的派生对象,设置它的成员变量的过程是在独立线程上下文中执行的。 对于 Threaded 派生对象,在同一时间,不同的线程上下文都可以从该对象中读取到同样的数据。

静态成员: 当创建新的线程上下文(Thread 或 Worker 对象)的时候,静态成员会被拷贝到新的上下文中。出于安全考虑,资源类型以及包含内部状态的对象类型的静态成员会被置空。 实际上这个特性实现了类似线程本地存储的功能。举例说明,假设某个类拥有包含数据库连接信息以及数据库连接对象静态成员, 那么当新的线程上下文启动的时候,仅有数据库连接信息会被复制到新上下文中,而数据库连接对象并不会被复制。 所以,需要在新的上下文中根据复制过来的数据库连接基本信息来初始化数据库连接对象,新创建的数据库连接对象是独立的, 不影响在原上下文中的数据库连接对象。

Caution

当使用 print_r, var_dump 或者其他函数来进行对象调试的时候,是没有递归保护机制的。

Note:

资源类型: PHP 中很多使用到 Resource 资源类型的扩展或函数并未针对多线程场景进行特殊设计,也就是说,虽然 pthreads 扩展提供了 在多个线程上下文中共享资源类型变量的能力,但是通常来说,你应该把它们视为非线程安全的。 所以,如果要在多个线程上下文中共享资源类型的变量,你应该特别谨慎对待。

Caution

为了提供一个稳定的运行环境,pthreads 扩展在执行过程中会有一些必需的额外限制。

  1、扩展的编译安装(Linux),编辑参数 --enable-maintainer-zts 是必选项:

 1 cd /Data/tgz/php-5.5.1
 2 ./configure --prefix=/Data/apps/php --with-config-file-path=/Data/apps/php/etc --with-mysql=/Data/apps/mysql --with-mysqli=/Data/apps/mysql/bin/mysql_config --with-iconv-dir --with-freetype-dir=/Data/apps/libs --with-jpeg-dir=/Data/apps/libs --with-png-dir=/Data/apps/libs --with-zlib --with-libxml-dir=/usr --enable-xml --disable-rpath --enable-bcmath --enable-shmop --enable-sysvsem --enable-inline-optimization --with-curl --enable-mbregex --enable-fpm --enable-mbstring --with-mcrypt=/Data/apps/libs --with-gd --enable-gd-native-ttf --with-openssl --with-mhash --enable-pcntl --enable-sockets --with-xmlrpc --enable-zip --enable-soap --enable-opcache --with-pdo-mysql --enable-maintainer-zts
 3 make clean
 4 make
 5 make install        
 6 
 7 unzip pthreads-master.zip
 8 cd pthreads-master
 9 /Data/apps/php/bin/phpize
10 ./configure --with-php-config=/Data/apps/php/bin/php-config
11 make
12 make install

vi /Data/apps/php/etc/php.ini extension = "pthreads.so"

 1 <?php  
 2   class test_thread_run extends Thread   
 3   {  
 4       public $url;  
 5       public $data;  
 6   
 7       public function __construct($url)  
 8       {  
 9           $this->url = $url;  
10       }  
11   
12       public function run()  
13       {  
14           if(($url = $this->url))  
15           {  
16               $this->data = model_http_curl_get($url);  
17           }  
18       }  
19   }  
20   
21   function model_thread_result_get($urls_array)   
22   {  
23       foreach ($urls_array as $key => $value)   
24       {  
25           $thread_array[$key] = new test_thread_run($value["url"]);  
26           $thread_array[$key]->start();  
27       }  
28   
29       foreach ($thread_array as $thread_array_key => $thread_array_value)   
30       {  
31           while($thread_array[$thread_array_key]->isRunning())  
32           {  
33               usleep(10);  
34           }  
35           if($thread_array[$thread_array_key]->join())  
36           {  
37               $variable_data[$thread_array_key] = $thread_array[$thread_array_key]->data;  
38           }  
39       }  
40       return $variable_data;  
41   }  
42   
43   function model_http_curl_get($url,$userAgent="")   
44   {  
45       $userAgent = $userAgent ? $userAgent : 'Mozilla/4.0 (compatible; MSIE 7.0; Windows NT 5.2)';   
46       $curl = curl_init();  
47       curl_setopt($curl, CURLOPT_URL, $url);  
48       curl_setopt($curl, CURLOPT_RETURNTRANSFER, 1);  
49       curl_setopt($curl, CURLOPT_TIMEOUT, 5);  
50       curl_setopt($curl, CURLOPT_USERAGENT, $userAgent);  
51       $result = curl_exec($curl);  
52       curl_close($curl);  
53       return $result;  
54   }  
55   
56   for ($i=0; $i < 100; $i++)   
57   {   
58       $urls_array[] = array("name" => "baidu", "url" => "http://www.baidu.com/s?wd=".mt_rand(10000,20000));  
59   }  
60   
61   $t = microtime(true);  
62   $result = model_thread_result_get($urls_array);  
63   $e = microtime(true);  
64   echo "多线程:".($e-$t)."\n";  
65   
66   $t = microtime(true);  
67   foreach ($urls_array as $key => $value)   
68   {  
69       $result_new[$key] = model_http_curl_get($value["url"]);  
70   }  
71   $e = microtime(true);  
72   echo "For循环:".($e-$t)."\n";  
73 ?>  

thinkphp例子

 1 <?php
 2 namespace Home\Controller;
 3 class test extends \Thread {
 4     public $url;
 5     public $result;
 6 
 7     public function __construct($url) {
 8         $this->url = $url;
 9     }
10 
11     public function run() {
12         if ($this->url) {
13             $this->result = model_http_curl_get($this->url);
14         }
15     }
16 }
17 function model_http_curl_get($url) {
18     $curl = curl_init();
19     curl_setopt($curl, CURLOPT_URL, $url);
20     curl_setopt($curl, CURLOPT_RETURNTRANSFER, 1);
21     curl_setopt($curl, CURLOPT_TIMEOUT, 5);
22     curl_setopt($curl, CURLOPT_USERAGENT, 'Mozilla/4.0 (compatible; MSIE 7.0; Windows NT 5.2)');
23     $result = curl_exec($curl);
24     curl_close($curl);
25     return $result;
26 }
27 for ($i = 0; $i < 10; $i++) {
28     $urls[] = 'http://www.baidu.com/s?wd=' . rand(10000, 20000);
29 }
30 /* 多线程速度测试 */
31 $t = microtime(true);
32 foreach ($urls as $key => $url) {
33     $workers[$key] = new test($url);
34     $workers[$key]->start();
35 }
36 foreach ($workers as $key => $worker) {
37     while ($workers[$key]->isRunning()) {
38         usleep(100);
39     }
40     if ($workers[$key]->join()) {
41         dump($workers[$key]->result);
42     }
43 }
44 $e = microtime(true);
45 echo "多线程耗时:" . ($e - $t) . "秒<br>";
46 /* 单线程速度测试 */
47 $t = microtime(true);
48 foreach ($urls as $key => $url) {
49     dump(model_http_curl_get($url));
50 }
51 $e = microtime(true);
52 echo "For循环耗时:" . ($e - $t) . "秒<br>";