IT博客汇
  • 首页
  • 精华
  • 技术
  • 设计
  • 资讯
  • 扯淡
  • 权利声明
  • 登录 注册

    详解Ecstore计划任务的代码逻辑

    tiandi发表于 2016-08-05 02:40:39
    love 0

    Ecos的框架是建立在linux的基础上的,在windows上是会缺胳膊少腿的,其中计划任务用的是linux的crontab,之前网上见过不少人碰到ecstore的计划任务执行报错的案例,也给出了一定的解决方法,这其中有些是误打误撞。tiandi认为,一段代码没有执行或者报错,必须得从它的源头查起。

    让我们来看看ecos的计划任务到底是怎么运行的:

    1. 执行/app/system/controller/admin/crontab.php中的exec方法,执行base_contab_schedule::trigger_once($cron[‘id’])。

    function exec($cron_id) {
            $this->begin('index.php?app=system&ctl=admin_crontab&act=index');
            $model = app::get('base')->model('crontab');
            $cron = $model->getRow('id', array('id'=>$cron_id));
            if(!$cron || (base_crontab_schedule::trigger_one($cron['id'])===false)) {
                $this->end(false, '执行失败');
            }
            $this->end(true, '执行成功');
        }
    

    2. /base/lib/crontab/schedule.php的trigger_once方法,$cron_id为任务类名。执行system_queue::instance()->publish(‘crontab:’.$worker, $worker)后,会打印日志在/data/crontab/下,并且将crontab表该任务的last字段做时间更新。

    static public function trigger_one($cron_id){
            if ($cron = app::get('base')->model('crontab')->getRow('id, last', array('id' => $cron_id, 'enabled => true'))){
                $now = time();            
                if (($now - $cron['last'])<60) {
                    trigger_error(app::get('base')->_('1分钟之内不能重复执行'), E_USER_ERROR);
                }
    
                //add_task
                $worker = $cron['id'];
                system_queue::instance()->publish('crontab:'.$worker, $worker);
                self::__log($cron_id, $now, 'add queue ok');
                app::get('base')->model('crontab')->update(array('last'=>$now), array('id' => $cron_id));
            }
        }
    

    3. /app/system/lib/queue.php中的publish方法,$queues的值为array(‘slow’),array(‘quick’),array(‘nor mal’)中的一个,然后请求控制器的publish。

    public function publish($exchange_name, $worker, $params=array(), $routing_key=null){
            $queues = $this->__get_publish_queues($exchange_name);
            foreach($queues as $queue_name){
                $queue_data = array(
                    'queue_name' => $queue_name,
                    'worker' => $worker,
                    'params' => $params);
    
                $this->get_controller()->publish($queue_name, $queue_data);
            }
            return true;
        }
    

    4. /app/system/lib/queue/adapter/mysql.php的publish方法,向system_queue_mysql表插入一条记录,默认该记录own_thread_id为-1,在dbschema表中定义。

    public function publish($queue_name,$queue_data){
            $time = time();
            $data = array('queue_name' => $queue_data['queue_name'],
                          'worker' => $queue_data['worker'],
                          'params' => serialize((array)$queue_data['params']),
                          'create_time' => $time);
    
            return $this->__model->insert($data);
        }
    

    5. 到此点击时间处理完毕,并不会真正的执行计划任务。执行还是靠的linux的cron发起的php请求。linux下设置了两条cron,分别是
    * * * * * /srv/www/www.suanjuzi.com/script/queue/queue.sh /opt/php/bin/php >/dev
    * * * * * /opt/php/bin/php /srv/www/www.suanjuzi.com/script/crontab/crontab.php

    6. 先看上面的第2条/script/crontab/crontab.php,实际上这个文件就主要做了一件事,调用base_crontab_schedule::trigger_all(),和后台点执行调用的trigger_one的代码逻辑差不多,只是这里是触发了所有的该执行的ecstore后台计划任务。

    7. 回过来看上面第1条命令,script/queue/queue.sh是执行命令,后面是参数。
    打开shell文件,可以看到queuelist是从/config/queuelist.php这个默认定义的文件返回,返回的值为slow,quick,normal,然后通过checkprocess查看当前linux进程是否有存在/script/queue.php slow[quick][normal],在的话显示active,不在的话,请求该php文件。之前计划任务不跑的原因就在这里,作为我们后面加的计划任务,都是属于normal,结果这里进程一直存在,所以就不会再继续再执行normal的计划任务,只会执行原先默认的quick和slow的计划任务。

    8. /script/queue.php执行了下面代码

    $queue_name = $argv[1];
    $queues = system_queue::instance()->get_config('queues');
    if ($num = (int)$queues[$queue_name]['thread']) {
        system_queue_consumer::instance('proc')->exec($queue_name, $num);
    }
    

    9. /app/system/lib/queue/consumer/proc.php中的exec方法,当前线程小于可允许最大值(/config/queuelist.php里定义)时,并且queue_mysql表的own_thread_id为-1时,通过new system_queue_consumer_proc_thread来创建新的线程,

    while ($this->threadRunning < $max && !system_queue::instance()->is_end($queue_name)) {
          $this->running[] = new system_queue_consumer_proc_thread($queue_name,$phpExec);
          usleep(200000);
          $this->threadRunning++;
    }
    

    10. /app/system/lib/queue/consumer/proc/thread.php,通过proc_open建立新线程,超过2次报错,但是在出问题normal卡住的时候这边也没报错,所以应该跟这个throw无关。

    while (($this->resource = proc_open($executable." ".$script." ".$queue_name, $descriptorspec, $this->pipes, NULL, $_ENV))===null) {
         $i++;
         if ($i>2) {
              throw new Exception(' cannot create new proccess for consume queue.', 30001);
         }
    }
    

    11.proc_open的第一个参数是命令行,调用PHP执行 /script/queue/queuescript.php $queue_name,

    if($queue_message = system_queue::instance()->get($queue_name)){
        system_queue::instance()->run_task($queue_message);
        system_queue::instance()->ack($queue_message);
    }
    

    12./app/system/lib/queue.php里的run_task和ack方法。Run_task最终调用的自定义计划任务类的exec方法,而ack则最后删除了数据表里的相应记录。

    public function run_task($queue_message){
            //todo: 异常处理
            $worker = $queue_message->get_worker();
            $params = $queue_message->get_params();
    
            $obj_task = new $worker();
            if ($obj_task instanceof base_interface_task) {
                call_user_func_array(array($obj_task, 'exec'), array($params));
                logger::info('task:'. get_class($obj_task). ' exec ok');
            }
            return true;
        }
    public function ack($queue_message){
            $queue_id = $queue_message->get_id();
            return $this->__model->delete(array('id'=>$queue_id));
        }
    
    文章评分2次,平均分5.0:★★★★★


沪ICP备19023445号-2号
友情链接