這篇文章給大家分享的內容是關于Swoft 原始碼剖析之Swoole和Swoft的一些介紹(Task投遞/定時任務篇),有一定的參考價值,有需要的朋友可以參考一下,
前言
Swoft的任務功能基于Swoole的Task機制,或者說Swoft的Task機制本質就是對Swoole的Task機制的封裝和加強,
任務投遞
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
|
//Swoft\Task\Task.php
class Task
{
/**
* Deliver coroutine or async task
*
* @param string $taskName
* @param string $methodName
* @param array $params
* @param string $type
* @param int $timeout
*
* @return bool|array
* @throws TaskException
*/
public static function deliver(string $taskName, string $methodName, array $params = [], string $type = self::TYPE_CO, $timeout = 3)
{
$data = TaskHelper::pack($taskName, $methodName, $params, $type);
if(!App::isWorkerStatus() && !App::isCoContext()){
return self::deliverByQueue($data);//見下文Command章節
}
if(!App::isWorkerStatus() && App::isCoContext()){
throw new TaskException('Please deliver task by http!');
}
$server = App::$server->getServer();
// Delier coroutine task
if ($type == self::TYPE_CO) {
$tasks[0] = $data;
$prifleKey = 'task' . '.' . $taskName . '.' . $methodName;
App::profileStart($prifleKey);
$result = $server->taskCo($tasks, $timeout);
App::profileEnd($prifleKey);
return $result;
}
// Deliver async task
return $server->task($data);
}
}
|
任務投遞Task::deliver()將呼叫引數打包后根據$type引數通過Swoole的$server->taskCo()或$server->task()介面投遞到Task行程,
Task本身始終是同步執行的,$type僅僅影響投遞這一操作的行為,Task::TYPE_ASYNC對應的$server->task()是異步投遞,Task::deliver()呼叫后馬上回傳;Task::TYPE_CO對應的$server->taskCo()是協程投遞,投遞后讓出協程控制,任務完成或執行超時后Task::deliver()才從協程回傳,
任務執行
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
|
//Swoft\Task\Bootstrap\Listeners\TaskEventListener
/**
* The listener of swoole task
* @SwooleListener({
* SwooleEvent::ON_TASK,
* SwooleEvent::ON_FINISH,
* })
*/
class TaskEventListener implements TaskInterface, FinishInterface
{
/**
* @param \Swoole\Server $server
* @param int $taskId
* @param int $workerId
* @param mixed $data
* @return mixed
* @throws \InvalidArgumentException
*/
public function onTask(Server $server, int $taskId, int $workerId, $data)
{
try {
/* @var TaskExecutor $taskExecutor*/
$taskExecutor = App::getBean(TaskExecutor::class);
$result = $taskExecutor->run($data);
} catch (\Throwable $throwable) {
App::error(sprintf('TaskExecutor->run %s file=%s line=%d ', $throwable->getMessage(), $throwable->getFile(), $throwable->getLine()));
$result = false;
// Release system resources
App::trigger(AppEvent::RESOURCE_RELEASE);
App::trigger(TaskEvent::AFTER_TASK);
}
return $result;
}
}
|
此處是swoole.onTask的事件回呼,其職責僅僅是將將Worker行程投遞來的打包后的資料轉發給TaskExecutor,
Swoole的Task機制的本質是Worker行程將耗時任務投遞給同步的Task行程(又名TaskWorker)處理,所以swoole.onTask的事件回呼是在Task行程中執行的,上文說過,Worker行程是你大部分HTTP服務代碼執行的環境,但是從TaskEventListener.onTask()方法開始,代碼的執行環境都是Task行程,也就是說,TaskExecutor和具體的TaskBean都是執行在Task行程中的,
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
|
//Swoft\Task\TaskExecutor
/**
* The task executor
*
* @Bean()
*/
class TaskExecutor
{
/**
* @param string $data
* @return mixed
*/
public function run(string $data)
{
$data = TaskHelper::unpack($data);
$name = $data['name'];
$type = $data['type'];
$method = $data['method'];
$params = $data['params'];
$logid = $data['logid'] ?? uniqid('', true);
$spanid = $data['spanid'] ?? 0;
$collector = TaskCollector::getCollector();
if (!isset($collector['task'][$name])) {
return false;
}
list(, $coroutine) = $collector['task'][$name];
$task = App::getBean($name);
if ($coroutine) {
$result = $this->runCoTask($task, $method, $params, $logid, $spanid, $name, $type);
} else {
$result = $this->runSyncTask($task, $method, $params, $logid, $spanid, $name, $type);
}
return $result;
}
}
|
任務執行思路很簡單,將Worker行程發過來的資料解包還原成原來的呼叫引數,根據$name引數找到對應的TaskBean并呼叫其對應的task()方法,其中TaskBean使用類級別注解@Task(name="TaskName")或者@Task("TaskName")宣告,
值得一提的一點是,@Task注解除了name屬性,還有一個coroutine屬性,上述代碼會根據該引數選擇使用協程的runCoTask()或者同步的runSyncTask()執行Task,但是由于而且由于Swoole的Task行程的執行是完全同步的,不支持協程,所以目前版本請該引數不要配置為true,同樣的在TaskBean中撰寫的任務代碼必須的同步阻塞的或者是要能根據環境自動將異步非阻塞和協程降級為同步阻塞的
從Process中投遞任務
前面我們提到:
Swoole的Task機制的本質是Worker行程將耗時任務投遞給同步的Task行程(又名 TaskWorker)處理,
換句話說,Swoole的$server->taskCo()或$server->task()都只能在Worker行程中使用,
這個限制大大的限制了使用場景, 如何能夠為了能夠在Process中投遞任務呢?Swoft為了繞過這個限制提供了Task::deliverByProcess()方法,其實作原理也很簡單,通過Swoole的$server->sendMessage()方法將呼叫資訊從Process中投遞到Worker行程中,然后由Worker行程替其投遞到Task行程當中,相關代碼如下:
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
|
//Swoft\Task\Task.php
/**
* Deliver task by process
*
* @param string $taskName
* @param string $methodName
* @param array $params
* @param string $type
* @param int $timeout
* @param int $workId
*
* @return bool
*/
public static function deliverByProcess(string $taskName, string $methodName, array $params = [], int $timeout = 3, int $workId = 0, string $type = self::TYPE_ASYNC): bool
{
/* @var PipeMessageInterface $pipeMessage */
$server = App::$server->getServer();
$pipeMessage = App::getBean(PipeMessage::class);
$data = [
'name' => $taskName,
'method' => $methodName,
'params' => $params,
'timeout' => $timeout,
'type' => $type,
];
$message = $pipeMessage->pack(PipeMessage::MESSAGE_TYPE_TASK, $data);
return $server->sendMessage($message, $workId);
}
|
資料打包后使用$server->sendMessage()投遞給Worker:
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
|
//Swoft\Bootstrap\Server\ServerTrait.php
/**
* onPipeMessage event callback
*
* @param \Swoole\Server $server
* @param int $srcWorkerId
* @param string $message
* @return void
* @throws \InvalidArgumentException
*/
public function onPipeMessage(Server $server, int $srcWorkerId, string $message)
{
/* @var PipeMessageInterface $pipeMessage */
$pipeMessage = App::getBean(PipeMessage::class);
list($type, $data) = $pipeMessage->unpack($message);
App::trigger(AppEvent::PIPE_MESSAGE, null, $type, $data, $srcWorkerId);
}
|
$server->sendMessage后,Worker行程收到資料時會觸發一個swoole.pipeMessage事件的回呼,Swoft會將其轉換成自己的swoft.pipeMessage事件并觸發.
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
|
//Swoft\Task\Event\Listeners\PipeMessageListener.php
/**
* The pipe message listener
*
* @Listener(event=AppEvent::PIPE_MESSAGE)
*/
class PipeMessageListener implements EventHandlerInterface
{
/**
* @param \Swoft\Event\EventInterface $event
*/
public function handle(EventInterface $event)
{
$params = $event->getParams();
if (count($params) < 3) {
return;
}
list($type, $data, $srcWorkerId) = $params;
if ($type != PipeMessage::MESSAGE_TYPE_TASK) {
return;
}
$type = $data['type'];
$taskName = $data['name'];
$params = $data['params'];
$timeout = $data['timeout'];
$methodName = $data['method'];
// delever task
Task::deliver($taskName, $methodName, $params, $type, $timeout);
}
}
|
swoft.pipeMessage事件最終由PipeMessageListener處理,在相關的監聽其中,如果發現swoft.pipeMessage事件由Task::deliverByProcess()產生的,Worker行程會替其執行一次Task::deliver(),最終將任務資料投遞到TaskWorker行程中,
一道簡單的回顧練習:從Task::deliverByProcess()到某TaskBean 最終執行任務,經歷了哪些行程,而呼叫鏈的哪些部分又分別是在哪些行程中執行?
從Command行程或其子行程中投遞任務
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
|
//Swoft\Task\QueueTask.php
/**
* @param string $data
* @param int $taskWorkerId
* @param int $srcWorkerId
*
* @return bool
*/
public function deliver(string $data, int $taskWorkerId = null, $srcWorkerId = null)
{
if ($taskWorkerId === null) {
$taskWorkerId = mt_rand($this->workerNum + 1, $this->workerNum + $this->taskNum);
}
if ($srcWorkerId === null) {
$srcWorkerId = mt_rand(0, $this->workerNum - 1);
}
$this->check();
$data = $this->pack($data, $srcWorkerId);
$result = \msg_send($this->queueId, $taskWorkerId, $data, false);
if (!$result) {
return false;
}
return true;
}
|
對于Command行程的任務投遞,情況會更復雜一點,
上文提到的Process,其往往衍生于Http/Rpc服務,作為同一個Manager的子孫行程,他們能夠拿到Swoole\Server的句柄變數,從而通過$server->sendMessage(),$server->task()等方法進行任務投遞,
但在Swoft的體系中,還有一個十分路人的角色: Command,
Command的行程從shell或cronb獨立啟動,和Http/Rpc服務相關的行程沒有親緣關系,因此Command行程以及從Command中啟動的Process行程是沒有辦法拿到Swoole\Server的呼叫句柄直接通過UnixSocket進行任務投遞的,
為了為這種行程提供任務投遞支持,Swoft利用了Swoole的Task行程的一個特殊功能----訊息佇列,

同一個專案中Command和Http\RpcServer 通過約定一個message_queue_key獲取到系統內核中的同一條訊息佇列,然后Comand行程就可以通過該訊息佇列向Task行程投遞任務了,
該機制沒有提供對外的公開方法,僅僅被包含在Task::deliver()方法中,Swoft會根據當前環境隱式切換投遞方式,但該訊息佇列的實作依賴Semaphore拓展,如果你想使用,需要在編譯PHP時加上--enable-sysvmsg引數,
定時任務
除了手動執行的普通任務,Swoft還提供了精度為秒的定時任務功能用來在專案中替代Linux的Crontab功能.
Swoft用兩個前置Process---任務計劃行程:CronTimerProcess和任務執行行程CronExecProcess
,和兩張記憶體資料表-----RunTimeTable(任務(配置)表)OriginTable((任務)執行表)用于定時任務的管理調度,
兩張表的每行記錄的結構如下:
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
|
\\Swoft\Task\Crontab\TableCrontab.php
/**
* 任務表,記錄用戶配置的任務資訊
* 表每行記錄包含的欄位如下,其中`rule`,`taskClass`,`taskMethod`生成key唯一確定一條記錄
* @var array $originStruct
*/
private $originStruct = [
'rule' => [\Swoole\Table::TYPE_STRING, 100],//定時任務執行規則,對應@Scheduled注解的cron屬性
'taskClass' => [\Swoole\Table::TYPE_STRING, 255],//任務名 對應@Task的name屬性(默認為類名)
'taskMethod' => [\Swoole\Table::TYPE_STRING, 255],//Task方法,對應@Scheduled注解所在方法
'add_time' => [\Swoole\Table::TYPE_STRING, 11],//初始化該表內容時的10位時間戳
];
/**
* 執行表,記錄短時間內要執行的任務串列及其執行狀態
* 表每行記錄包含的欄位如下,其中`taskClass`,`taskMethod`,`minute`,`sec`生成key唯一確定一條記錄
* @var array $runTimeStruct
*/
private $runTimeStruct = [
'taskClass' => [\Swoole\Table::TYPE_STRING, 255],//同上
'taskMethod' => [\Swoole\Table::TYPE_STRING, 255],//同上
'minute' => [\Swoole\Table::TYPE_STRING, 20],//需要執行任務的時間,精確到分鐘 格式date('YmdHi')
'sec' => [\Swoole\Table::TYPE_STRING, 20],//需要執行任務的時間,精確到分鐘 10位時間戳
'runStatus' => [\Swoole\TABLE::TYPE_INT, 4],//任務狀態,有 0(未執行) 1(已執行) 2(執行中) 三種,
//注意:這里的執行是一個容易誤解的地方,此處的執行并不是指任務本身的執行,而是值`任務投遞`這一操作的執行,從宏觀上看換成 _未投遞_,_已投遞_,_投遞中_描述會更準確,
];
|
此處為何要使用Swoole的記憶體Table?
Swoft的的定時任務管理是分別由 任務計劃行程 和 任務執行行程 行程負責的,兩個行程的運行共同管理定時任務,如果使用行程間獨立的array()等結構,兩個行程必然需要頻繁的行程間通信,而使用跨行程的Table(本文的Table,除非特別說明,都指Swoole的Swoole\Table結構)直接進行行程間資料共享,不僅性能高,操作簡單 還解耦了兩個行程,
為了Table能夠在兩個行程間共同使用,Table必須在Swoole Server啟動前創建并分配記憶體,具體代碼在Swoft\Task\Bootstrap\Listeners->onBeforeStart()中,比較簡單,有興趣的可以自行閱讀,
背景介紹完了,我們來看看這兩個定時任務行程的行為
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
|
//Swoft\Task\Bootstrap\Process\CronTimerProcess.php
/**
* Crontab timer process
*
* @Process(name="cronTimer", boot=true)
*/
class CronTimerProcess implements ProcessInterface
{
/**
* @param \Swoft\Process\Process $process
*/
public function run(SwoftProcess $process)
{
//code....
/* @var \Swoft\Task\Crontab\Crontab $cron*/
$cron = App::getBean('crontab');
// Swoole/HttpServer
$server = App::$server->getServer();
$time = (60 - date('s')) * 1000;
$server->after($time, function () use ($server, $cron) {
// Every minute check all tasks, and prepare the tasks that next execution point needs
$cron->checkTask();
$server->tick(60 * 1000, function () use ($cron) {
$cron->checkTask();
});
});
}
}
|
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
|
//Swoft\Task\Crontab\Crontab.php
/**
* 初始化runTimeTable資料
*
* @param array $task 任務
* @param array $parseResult 決議crontab命令規則結果,即Task需要在當前分鐘內的哪些秒執行
* @return bool
*/
private function initRunTimeTableData(array $task, array $parseResult): bool
{
$runTimeTableTasks = $this->getRunTimeTable()->table;
$min = date('YmdHi');
$sec = strtotime(date('Y-m-d H:i'));
foreach ($parseResult as $time) {
$this->checkTaskQueue(false);
$key = $this->getKey($task['rule'], $task['taskClass'], $task['taskMethod'], $min, $time + $sec);
$runTimeTableTasks->set($key, [
'taskClass' => $task['taskClass'],
'taskMethod' => $task['taskMethod'],
'minute' => $min,
'sec' => $time + $sec,
'runStatus' => self::NORMAL
]);
}
return true;
}
|
CronTimerProcess是Swoft的定時任務調度行程,其核心方法是Crontab->initRunTimeTableData(),
該行程使用了Swoole的定時器功能,通過Swoole\Timer在每分鐘首秒時執行的回呼,CronTimerProcess每次被喚醒后都會遍歷任務表計算出當前這一分鐘內的60秒分別需要執行的任務清單,寫入執行表并標記為 未執行,
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
|
//Swoft\Task\Bootstrap\Process
/**
* Crontab process
*
* @Process(name="cronExec", boot=true)
*/
class CronExecProcess implements ProcessInterface
{
/**
* @param \Swoft\Process\Process $process
*/
public function run(SwoftProcess $process)
{
$pname = App::$server->getPname();
$process->name(sprintf('%s cronexec process', $pname));
/** @var \Swoft\Task\Crontab\Crontab $cron */
$cron = App::getBean('crontab');
// Swoole/HttpServer
$server = App::$server->getServer();
$server->tick(0.5 * 1000, function () use ($cron) {
$tasks = $cron->getExecTasks();
if (!empty($tasks)) {
foreach ($tasks as $task) {
// Diliver task
Task::deliverByProcess($task['taskClass'], $task['taskMethod']);
$cron->finishTask($task['key']);
}
}
});
}
}
|
CronExecProcess作為定時任務的執行者,通過Swoole\Timer每0.5s喚醒自身一次,然后把 執行表 遍歷一次,挑選當下需要執行的任務,通過sendMessage()投遞出去并更新該 任務執行表中的狀態,
該執行行程只負責任務的投遞,任務的實際實際執行仍然在Task行程中由TaskExecutor處理,
定時任務的宏觀執行情況如下:

明確的學習思路能更高效的學習

點擊加入該群學習
轉載請註明出處,本文鏈接:https://www.uj5u.com/houduan/121046.html
標籤:PHP
上一篇:C鏈表做多項式加減法
下一篇:void Pos(int x,int y){