nginx+lua访问流量实时上报kafka

在nginx这一层,接收到访问请求的时候,就把请求的流量上报发送给kafka

storm才能去消费kafka中的实时的访问日志,然后去进行缓存热数据的统计

从lua脚本直接创建一个kafka producer,发送数据到kafka

wget https://github.com/doujiang24/lua-resty-kafka/archive/master.zip

yum install -y unzip

unzip lua-resty-kafka-master.zip

cp -rf /usr/local/lua-resty-kafka-master/lib/resty /usr/hello/lualib
nginx -s reload

  lua脚本:

local cjson = require("cjson")  
local producer = require("resty.kafka.producer")  

local broker_list = {  
    { host = "192.168.31.187", port = 9092 },  
    { host = "192.168.31.19", port = 9092 },  
    { host = "192.168.31.227", port = 9092 }
}

local log_json = {}  
log_json["headers"] = ngx.req.get_headers()  
log_json["uri_args"] = ngx.req.get_uri_args()  
log_json["body"] = ngx.req.read_body()  
log_json["http_version"] = ngx.req.http_version()  
log_json["method"] =ngx.req.get_method() 
log_json["raw_reader"] = ngx.req.raw_header()  
log_json["body_data"] = ngx.req.get_body_data()  

local message = cjson.encode(log_json);  

local productId = ngx.req.get_uri_args()["productId"]

local async_producer = producer:new(broker_list, { producer_type = "async" })   
local ok, err = async_producer:send("access-log", productId, message)  

if not ok then  
    ngx.log(ngx.ERR, "kafka send err:", err)  
    return  
end

  

两台机器上都这样做,才能统一上报流量到kafka

bin/kafka-topics.sh --zookeeper 192.168.31.187:2181,192.168.31.19:2181,192.168.31.227:2181 --topic access-log --replication-factor 1 --partitions 1 --create

bin/kafka-console-consumer.sh --zookeeper 192.168.31.187:2181,192.168.31.19:2181,192.168.31.227:2181 --topic access-log --from-beginning

(1)kafka在187上的节点死掉了,可能是虚拟机的问题,杀掉进程,重新启动一下

nohup bin/kafka-server-start.sh config/server.properties &

(2)需要在nginx.conf中,http部分,加入resolver 8.8.8.8;

(3)需要在kafka中加入advertised.host.name = 192.168.31.187,重启三个kafka进程

(4)需要启动eshop-cache缓存服务,因为nginx中的本地缓存可能不在了

基于storm+kafka完成商品访问次数实时统计拓扑的开发:

总结思路:

1、kafka consumer spout

单独的线程消费,写入队列

nextTuple,每次都是判断队列有没有数据,有的话再去获取并发射出去,不能阻塞

2、日志解析bolt

3、商品访问次数统计bolt

4、基于LRUMap完成统计

import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.StormSubmitter;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.tuple.Fields;
import org.apache.storm.utils.Utils;

import com.roncoo.eshop.storm.bolt.LogParseBolt;
import com.roncoo.eshop.storm.bolt.ProductCountBolt;
import com.roncoo.eshop.storm.spout.AccessLogKafkaSpout;

/**
 * 热数据统计拓扑
 * @author Administrator
 *
 */
public class HotProductTopology {

        public static void main(String[] args) {
                TopologyBuilder builder = new TopologyBuilder();
        
                builder.setSpout("AccessLogKafkaSpout", new AccessLogKafkaSpout(), 1);
                builder.setBolt("LogParseBolt", new LogParseBolt(), 5)
                                .setNumTasks(5)
                                .shuffleGrouping("AccessLogKafkaSpout");  
                builder.setBolt("ProductCountBolt", new ProductCountBolt(), 5)
                                .setNumTasks(10)
                                .fieldsGrouping("LogParseBolt", new Fields("productId"));  
                
                Config config = new Config();
                
                if(args != null && args.length > 1) {
                        config.setNumWorkers(3);  
                        try {
                                StormSubmitter.submitTopology(args[0], config, builder.createTopology());
                        } catch (Exception e) {
                                e.printStackTrace();
                        }
                } else {
                        LocalCluster cluster = new LocalCluster();
                        cluster.submitTopology("HotProductTopology", config, builder.createTopology());  
                        Utils.sleep(30000); 
                        cluster.shutdown();
                }
        }
        
}

  

import java.util.Map;

import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;

import com.alibaba.fastjson.JSONObject;

/**
 * 日志解析的bolt
 * @author Administrator
 *
 */
public class LogParseBolt extends BaseRichBolt {

        private static final long serialVersionUID = -8017609899644290359L;

        private OutputCollector collector;
        
        @SuppressWarnings("rawtypes")
        public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
                this.collector = collector;
        }
        
        public void execute(Tuple tuple) {
                String message = tuple.getStringByField("message");  
                JSONObject messageJSON = JSONObject.parseObject(message);
                JSONObject uriArgsJSON = messageJSON.getJSONObject("uri_args"); 
                Long productId = uriArgsJSON.getLong("productId"); 
                
                if(productId != null) {
                        collector.emit(new Values(productId));  
                }
        }
        
        public void declareOutputFields(OutputFieldsDeclarer declarer) {
                declarer.declare(new Fields("productId"));   
        }

}

  

import java.util.ArrayList;
import java.util.List;
import java.util.Map;

import org.apache.storm.shade.org.json.simple.JSONArray;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.trident.util.LRUMap;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.utils.Utils;

import com.roncoo.eshop.storm.zk.ZooKeeperSession;

/**
 * 商品访问次数统计bolt
 * @author Administrator
 *
 */
public class ProductCountBolt extends BaseRichBolt {

        private static final long serialVersionUID = -8761807561458126413L;

        private LRUMap<Long, Long> productCountMap = new LRUMap<Long, Long>(1000);
        private ZooKeeperSession zkSession;
        private int taskid;
        
        @SuppressWarnings("rawtypes")
        public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
                this.zkSession = ZooKeeperSession.getInstance();
                this.taskid = context.getThisTaskId();
                
                new Thread(new ProductCountThread()).start();
                
                // 1、将自己的taskid写入一个zookeeper node中,形成taskid的列表
                // 2、然后每次都将自己的热门商品列表,写入自己的taskid对应的zookeeper节点
                // 3、然后这样的话,并行的预热程序才能从第一步中知道,有哪些taskid
                // 4、然后并行预热程序根据每个taskid去获取一个锁,然后再从对应的znode中拿到热门商品列表
                initTaskId(context.getThisTaskId());
        }
        
        private void initTaskId(int taskid) {
                // ProductCountBolt所有的task启动的时候, 都会将自己的taskid写到同一个node的值中
                // 格式就是逗号分隔,拼接成一个列表
                // 111,211,355
                
                zkSession.acquireDistributedLock();
                
                String taskidList = zkSession.getNodeData();
                if(!"".equals(taskidList)) {
                        taskidList += "," + taskid;
                } else {
                        taskidList += taskid;
                }
                
                zkSession.setNodeData("/taskid-list", taskidList);  
                
                zkSession.releaseDistributedLock();
        }
        
        private class ProductCountThread implements Runnable {
                
                public void run() {
                        List<Map.Entry<Long, Long>> topnProductList = new ArrayList<Map.Entry<Long, Long>>();   
                        
                        while(true) {
                                topnProductList.clear();
                                
                                int topn = 3;
                                
                                if(productCountMap.size() == 0) {
                                        Utils.sleep(100);
                                        continue;
                                }
                                
                                for(Map.Entry<Long, Long> productCountEntry : productCountMap.entrySet()) {
                                        if(topnProductList.size() == 0) {
                                                topnProductList.add(productCountEntry);
                                        } else {
                                                // 比较大小,生成最热topn的算法有很多种
                                                // 但是我这里为了简化起见,不想引入过多的数据结构和算法的的东西
                                                // 很有可能还是会有漏洞,但是我已经反复推演了一下了,而且也画图分析过这个算法的运行流程了
                                                boolean bigger = false;
                                                
                                                for(int i = 0; i < topnProductList.size(); i++){
                                                        Map.Entry<Long, Long> topnProductCountEntry = topnProductList.get(i);
                                                        
                                                        if(productCountEntry.getValue() > topnProductCountEntry.getValue()) {
                                                                int lastIndex = topnProductList.size() < topn ? topnProductList.size() - 1 : topn - 2;
                                                                for(int j = lastIndex; j >= i; j--) {
                                                                        topnProductList.set(j + 1, topnProductList.get(j));  
                                                                }
                                                                topnProductList.set(i, productCountEntry);
                                                                bigger = true;
                                                                break;
                                                        }
                                                }
                                                
                                                if(!bigger) {
                                                        if(topnProductList.size() < topn) {
                                                                topnProductList.add(productCountEntry);
                                                        }
                                                }
                                        }
                                }
                                
                                // 获取到一个topn list
                                String topnProductListJSON = JSONArray.toJSONString(topnProductList);
                                zkSession.setNodeData("/task-hot-product-list-" + taskid, topnProductListJSON);
                                
                                Utils.sleep(5000); 
                        }
                }
                
        }
        
        public void execute(Tuple tuple) {
                Long productId = tuple.getLongByField("productId"); 
                
                Long count = productCountMap.get(productId);
                if(count == null) {
                        count = 0L;
                }
                count++;
                
                productCountMap.put(productId, count);
        }

        public void declareOutputFields(OutputFieldsDeclarer declarer) {
                
        }

}

  

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ArrayBlockingQueue;

import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;

import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichSpout;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values;
import org.apache.storm.utils.Utils;

/**
 * kafka消费数据的spout
 */
public class AccessLogKafkaSpout extends BaseRichSpout {

        private static final long serialVersionUID = 8698470299234327074L;

        private ArrayBlockingQueue<String> queue = new ArrayBlockingQueue<String>(1000);
        
        private SpoutOutputCollector collector;
        
        @SuppressWarnings("rawtypes")
        public void open(Map conf, TopologyContext context,
                        SpoutOutputCollector collector) {
                this.collector = collector;
                startKafkaConsumer();
        }
        
        @SuppressWarnings("rawtypes")
        private void startKafkaConsumer() {
                Properties props = new Properties();
        props.put("zookeeper.connect", "192.168.31.187:2181,192.168.31.19:2181,192.168.31.227:2181");
        props.put("group.id", "eshop-cache-group");
        props.put("zookeeper.session.timeout.ms", "40000");
        props.put("zookeeper.sync.time.ms", "200");
        props.put("auto.commit.interval.ms", "1000");
        ConsumerConfig consumerConfig = new ConsumerConfig(props);
                
                ConsumerConnector consumerConnector = Consumer.
                                createJavaConsumerConnector(consumerConfig);
                String topic = "access-log";
                
                Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
        topicCountMap.put(topic, 1);
        
        Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = 
                        consumerConnector.createMessageStreams(topicCountMap);
        List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic);
        
        for (KafkaStream stream : streams) {
            new Thread(new KafkaMessageProcessor(stream)).start();
        }
        }
        
        private class KafkaMessageProcessor implements Runnable {

                @SuppressWarnings("rawtypes")
                private KafkaStream kafkaStream;
                
                @SuppressWarnings("rawtypes")
                public KafkaMessageProcessor(KafkaStream kafkaStream) {
                        this.kafkaStream = kafkaStream;
                }
                
                @SuppressWarnings("unchecked")
                public void run() {
                        ConsumerIterator<byte[], byte[]> it = kafkaStream.iterator();
                while (it.hasNext()) {
                        String message = new String(it.next().message());
                        try {
                                        queue.put(message);
                                } catch (InterruptedException e) {
                                        e.printStackTrace();
                                } 
                }
                }
                
        }
        
        public void nextTuple() {
                if(queue.size() > 0) {
                        try {
                                String message = queue.take();
                                collector.emit(new Values(message));  
                        } catch (Exception e) {
                                e.printStackTrace();
                        }
                } else {
                        Utils.sleep(100);  
                }
        }
         
        public void declareOutputFields(OutputFieldsDeclarer declarer) {
                declarer.declare(new Fields("message"));  
        }
        
}

  

import java.util.concurrent.CountDownLatch;

import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.Watcher.Event.KeeperState;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;

/**
 * ZooKeeperSession
 * @author Administrator
 *
 */
public class ZooKeeperSession {
        
        private static CountDownLatch connectedSemaphore = new CountDownLatch(1);
        
        private ZooKeeper zookeeper;

        public ZooKeeperSession() {
                // 去连接zookeeper server,创建会话的时候,是异步去进行的
                // 所以要给一个监听器,说告诉我们什么时候才是真正完成了跟zk server的连接
                try {
                        this.zookeeper = new ZooKeeper(
                                        "192.168.31.187:2181,192.168.31.19:2181,192.168.31.227:2181", 
                                        50000, 
                                        new ZooKeeperWatcher());
                        // 给一个状态CONNECTING,连接中
                        System.out.println(zookeeper.getState());
                        
                        try {
                                // CountDownLatch
                                // java多线程并发同步的一个工具类
                                // 会传递进去一些数字,比如说1,2 ,3 都可以
                                // 然后await(),如果数字不是0,那么久卡住,等待
                                
                                // 其他的线程可以调用coutnDown(),减1
                                // 如果数字减到0,那么之前所有在await的线程,都会逃出阻塞的状态
                                // 继续向下运行
                                
                                connectedSemaphore.await();
                        } catch(InterruptedException e) {
                                e.printStackTrace();
                        }

                        System.out.println("ZooKeeper session established......");
                } catch (Exception e) {
                        e.printStackTrace();
                }
        }
        
        /**
         * 获取分布式锁
         * @param productId
         */
        public void acquireDistributedLock() {
                String path = "/taskid-list-lock";
        
                try {
                        zookeeper.create(path, "".getBytes(), 
                                        Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
                        System.out.println("success to acquire lock for taskid-list-lock");  
                } catch (Exception e) {
                        // 如果那个商品对应的锁的node,已经存在了,就是已经被别人加锁了,那么就这里就会报错
                        // NodeExistsException
                        int count = 0;
                        while(true) {
                                try {
                                        Thread.sleep(1000); 
                                        zookeeper.create(path, "".getBytes(), 
                                                        Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
                                } catch (Exception e2) {
                                        count++;
                                        System.out.println("the " + count + " times try to acquire lock for taskid-list-lock......");
                                        continue;
                                }
                                System.out.println("success to acquire lock for taskid-list-lock after " + count + " times try......");
                                break;
                        }
                }
        }
        
        /**
         * 释放掉一个分布式锁
         * @param productId
         */
        public void releaseDistributedLock() {
                String path = "/taskid-list-lock";
                try {
                        zookeeper.delete(path, -1); 
                        System.out.println("release the lock for taskid-list-lock......");  
                } catch (Exception e) {
                        e.printStackTrace();
                }
        }
        
        public String getNodeData() {
                try {
                        return new String(zookeeper.getData("/taskid-list", false, new Stat()));  
                } catch (Exception e) {
                        e.printStackTrace();
                }
                return "";
        }
        
        public void setNodeData(String path, String data) {
                try {
                        zookeeper.setData(path, data.getBytes(), -1);
                } catch (Exception e) {
                        e.printStackTrace();
                }
        }
        
        /**
         * 建立zk session的watcher
         * @author Administrator
         *
         */
        private class ZooKeeperWatcher implements Watcher {

                public void process(WatchedEvent event) {
                        System.out.println("Receive watched event: " + event.getState());
                        if(KeeperState.SyncConnected == event.getState()) {
                                connectedSemaphore.countDown();
                        } 
                }
                
        }
        
        /**
         * 封装单例的静态内部类
         * @author Administrator
         *
         */
        private static class Singleton {
                
                private static ZooKeeperSession instance;
                
                static {
                        instance = new ZooKeeperSession();
                }
                
                public static ZooKeeperSession getInstance() {
                        return instance;
                }
                
        }
        
        /**
         * 获取单例
         * @return
         */
        public static ZooKeeperSession getInstance() {
                return Singleton.getInstance();
        }
        
        /**
         * 初始化单例的便捷方法
         */
        public static void init() {
                getInstance();
        }
        
}

  于双重zookeeper分布式锁完成分布式并行缓存预热:

1、服务启动的时候,进行缓存预热

2、从zk中读取taskid列表

3、依次遍历每个taskid,尝试获取分布式锁,如果获取不到,快速报错,不要等待,因为说明已经有其他服务实例在预热了

4、直接尝试获取下一个taskid的分布式锁

5、即使获取到了分布式锁,也要检查一下这个taskid的预热状态,如果已经被预热过了,就不再预热了

6、执行预热操作,遍历productid列表,查询数据,然后写ehcache和redis

7、预热完成后,设置taskid对应的预热状态

ZKsession重载两个方法:

/**
         * 获取分布式锁
         * @param productId
         */
        public boolean acquireFastFailedDistributedLock(String path) {
                try {
                        zookeeper.create(path, "".getBytes(), 
                                        Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
                        System.out.println("success to acquire lock for " + path);  
                        return true;
                } catch (Exception e) {
                        System.out.println("fail to acquire lock for " + path);  
                }
                return false;
        }

/**
         * 释放掉一个分布式锁
         * @param productId
         */
        public void releaseDistributedLock(String path) {
                try {
                        zookeeper.delete(path, -1); 
                        System.out.println("release the lock for " + path + "......");  
                } catch (Exception e) {
                        e.printStackTrace();
                }
        }
public String getNodeData(String path) {
                try {
                        return new String(zookeeper.getData(path, false, new Stat())); 
                } catch (Exception e) {
                        e.printStackTrace();
                }
                return "";
        }
        
        public void setNodeData(String path, String data) {
                try {
                        zookeeper.setData(path, data.getBytes(), -1);
                } catch (Exception e) {
                        e.printStackTrace();
                }
        }

  

/**
         * 获取分布式锁
         */
        public void acquireDistributedLock(String path) {
                try {
                        zookeeper.create(path, "".getBytes(), 
                                        Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
                        System.out.println("success to acquire lock for " + path);  
                } catch (Exception e) {
                        // 如果那个商品对应的锁的node,已经存在了,就是已经被别人加锁了,那么就这里就会报错
                        // NodeExistsException
                        int count = 0;
                        while(true) {
                                try {
                                        Thread.sleep(1000); 
                                        zookeeper.create(path, "".getBytes(), 
                                                        Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
                                } catch (Exception e2) {
                                        count++;
                                        System.out.println("the " + count + " times try to acquire lock for " + path + "......");
                                        continue;
                                }
                                System.out.println("success to acquire lock for " + path + " after " + count + " times try......");
                                break;
                        }
                }
        }

  

import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.roncoo.eshop.cache.model.ProductInfo;
import com.roncoo.eshop.cache.service.CacheService;
import com.roncoo.eshop.cache.spring.SpringContext;
import com.roncoo.eshop.cache.zk.ZooKeeperSession;

/**
 * 缓存预热线程
 */
public class CachePrewarmThread extends Thread {
        
        @Override
        public void run() {
                CacheService cacheService = (CacheService) SpringContext.
                                getApplicationContext().getBean("cacheService"); 
                ZooKeeperSession zkSession = ZooKeeperSession.getInstance();
                
                // 获取storm taskid列表
                String taskidList = zkSession.getNodeData("/taskid-list"); 
                
                if(taskidList != null && !"".equals(taskidList)) {
                        String[] taskidListSplited = taskidList.split(",");  
                        for(String taskid : taskidListSplited) {
                                String taskidLockPath = "/taskid-lock-" + taskid;
                                
                                boolean result = zkSession.acquireFastFailedDistributedLock(taskidLockPath);
                                if(!result) {
                                        continue;
                                }
                                
                                String taskidStatusLockPath = "/taskid-status-lock-" + taskid;
                                zkSession.acquireDistributedLock(taskidStatusLockPath);  
                                //检查越热的状态
                                String taskidStatus = zkSession.getNodeData("/taskid-status-" + taskid);
                                
                                if("".equals(taskidStatus)) {
                                        String productidList = zkSession.getNodeData("/task-hot-product-list-" + taskid);
                                        JSONArray productidJSONArray = JSONArray.parseArray(productidList);
                                        
                                        for(int i = 0; i < productidJSONArray.size(); i++) {
                                                Long productId = productidJSONArray.getLong(i);
                                                String productInfoJSON = "{\"id\": " + productId + ", \"name\": \"iphone7手机\", \"price\": 5599, \"pictureList\":\"a.jpg,b.jpg\", \"specification\": \"iphone7的规格\", \"service\": \"iphone7的售后服务\", \"color\": \"红色,白色,黑色\", \"size\": \"5.5\", \"shopId\": 1, \"modifiedTime\": \"2017-01-01 12:00:00\"}";
                                                ProductInfo productInfo = JSONObject.parseObject(productInfoJSON, ProductInfo.class);
                                                cacheService.saveProductInfo2LocalCache(productInfo);
                                                cacheService.saveProductInfo2ReidsCache(productInfo);  
                                        }
                                        
                                        zkSession.setNodeData(taskidStatusLockPath, "success");   
                                }
                                
                                zkSession.releaseDistributedLock(taskidStatusLockPath);
                                
                                zkSession.releaseDistributedLock(taskidLockPath);
                        }
                }
        }
        
}

  

import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.roncoo.eshop.cache.model.ProductInfo;
import com.roncoo.eshop.cache.service.CacheService;
import com.roncoo.eshop.cache.spring.SpringContext;
import com.roncoo.eshop.cache.zk.ZooKeeperSession;

/**
 * 缓存预热线程
 */
public class CachePrewarmThread extends Thread {
        
        @Override
        public void run() {
                CacheService cacheService = (CacheService) SpringContext.
                                getApplicationContext().getBean("cacheService"); 
                ZooKeeperSession zkSession = ZooKeeperSession.getInstance();
                
                // 获取storm taskid列表
                String taskidList = zkSession.getNodeData("/taskid-list"); 
                
                if(taskidList != null && !"".equals(taskidList)) {
                        String[] taskidListSplited = taskidList.split(",");  
                        for(String taskid : taskidListSplited) {
                                String taskidLockPath = "/taskid-lock-" + taskid;
                                
                                boolean result = zkSession.acquireFastFailedDistributedLock(taskidLockPath);
                                if(!result) {
                                        continue;
                                }
                                
                                String taskidStatusLockPath = "/taskid-status-lock-" + taskid;
                                zkSession.acquireDistributedLock(taskidStatusLockPath);  
                                //检查越热的状态
                                String taskidStatus = zkSession.getNodeData("/taskid-status-" + taskid);
                                
                                if("".equals(taskidStatus)) {
                                        String productidList = zkSession.getNodeData("/task-hot-product-list-" + taskid);
                                        JSONArray productidJSONArray = JSONArray.parseArray(productidList);
                                        
                                        for(int i = 0; i < productidJSONArray.size(); i++) {
                                                Long productId = productidJSONArray.getLong(i);
                                                String productInfoJSON = "{\"id\": " + productId + ", \"name\": \"iphone7手机\", \"price\": 5599, \"pictureList\":\"a.jpg,b.jpg\", \"specification\": \"iphone7的规格\", \"service\": \"iphone7的售后服务\", \"color\": \"红色,白色,黑色\", \"size\": \"5.5\", \"shopId\": 1, \"modifiedTime\": \"2017-01-01 12:00:00\"}";
                                                ProductInfo productInfo = JSONObject.parseObject(productInfoJSON, ProductInfo.class);
                                                cacheService.saveProductInfo2LocalCache(productInfo);
                                                cacheService.saveProductInfo2ReidsCache(productInfo);  
                                        }
                                        
                                        zkSession.setNodeData(taskidStatusLockPath, "success");   
                                }
                                
                                zkSession.releaseDistributedLock(taskidStatusLockPath);
                                
                                zkSession.releaseDistributedLock(taskidLockPath);
                        }
                }
        }
        
}