行使php+swoole对client数据实时更新(一)

南宁公司和几个分公司之间都使用了呼叫系统,所以需要把分公司的通话数据同步到南宁,我们将同步订单的任务表添加一个hash作为key,需要同步到仓储系统中去,此处采用Swoole进行php服务端的websocket开发,如果想和服务端进行通讯,前端代码,只实现了群发

梅里达公司和多少个支行之间都使用了呼叫系统,然后今后需求做四个呼叫通话数据深入分析,由于分集团的呼唤服务器是在内网,通过技能手腕映射出来,分企业到澳门之间的网络不安宁,所以须要把分部的通话数据同步到哈里斯堡。

引进背景:如果大家每一日有10000个订单生成,必要联合到仓库储存系统中去,之前做法是开启贰个crontab去跑这么些任务,可是发掘总有痛感同步功效低,间隔时间都以分钟等级的。

一旦想对一个列表压实时的革新,古板的做法是使用轮询的法子。以web为例,通过Ajax按期须要服务端然后获取数据展现在页面。这种措施贯彻轻松,缺点就是浪费能源。

鉴于本文的力量轻便,有无数闲话逻辑的细节尚未兑现,只兑现了群发,具体代码如下所示:

本身最轻便易行的点子便是间接配置MySQL的着力同步就足以联手数据到Cordova来了。然则出售呼叫系统那边的商家不给MySQL权限我们。
所以那些点子只可以遗弃了。

杀鸡取蛋方案测试:大家将一并订单的职务表增添八个hash作为key,作为分发条件,因为mysql中select若是做mod函数是用不到目录的,所以我们自身做随机hash,然而必须无需限制太大,避防服务器财富非常不足,方法是依照hashkey投放到不相同的历程中开始展览同步,测量检验代码如下

HTTP1.1新扩大了对websocket的协理,那样就足以将被动展现调换为积极通报。也正是通过websocket与服务端保持长久链接,一旦数据发生变化,由server布告client数据有更新,然后再展开刷新等操作。那样就节约了过多不须要的被动央求,节省了服务器能源。

php代码:

于是大家差不离的想,使用PHP来落到实处定期一个简易的PHP按期一并工具,然后PHP进程常驻后台运转,所以率先就先到了贰个PHP组件:SWOOLE,经过讨论,分集团的每天半天生成的数据量最大在5000条左右,所以那个方案是实用,就像此干。

<?php
/**
 * Created by PhpStorm.
 * User: xujun
 * Date: 2017/8/26
 * Time: 9:37
 */
//假定需要处理的数据如下
class Process{
  public $mpid=0;
  public $max_precess=5;
  //代替从数据库中读取的内容
  public $task = [
    ['uid'=>1,'uname'=>'bot','hash'=>1,'handle'=>'test'],
    ['uid'=>2,'uname'=>'bot1','hash'=>2,'handle'=>'test'],
    ['uid'=>3,'uname'=>'bot2','hash'=>3,'handle'=>'test'],
    ['uid'=>4,'uname'=>'bot3','hash'=>4,'handle'=>'test'],
    ['uid'=>2,'uname'=>'bot4','hash'=>2,'handle'=>'test'],
    ['uid'=>3,'uname'=>'bot5','hash'=>3,'handle'=>'test'],
    ['uid'=>4,'uname'=>'bot6','hash'=>1,'handle'=>'test'],
  ];
  public $works = [];
  public $swoole_table = NULL;
  //public $new_index=0;
  function test($index,$task){
    print_r("[".date('Y-m-d H:i:s')."]".'work-index:'.$index.'处理'.$task['uname'].'完成'.PHP_EOL);
  }

  public function __construct(){
    try {
      $this->swoole_table = new swoole_table(1024);
      $this->swoole_table->column('index', swoole_table::TYPE_INT);//用于父子进程间数据交换
      $this->swoole_table->create();

      swoole_set_process_name(sprintf('php-ps:%s', 'master'));
      $this->mpid = posix_getpid();
      $this->run();
      $this->processWait();
    }catch (\Exception $e){
      die('ALL ERROR: '.$e->getMessage());
    }
  }

  public function run(){
    for ($i=0; $i < $this->max_precess; $i++) {
      $this->CreateProcess();
    }
  }

  private function getTask($index){
    $_return = [];
    foreach ($this->task as $v){
      if($v['hash']==$index){
        $_return[] = $v;
      }
    }
    return $_return;
  }

  public function CreateProcess($index=null){
    if(is_null($index)){//如果没有指定了索引,新建的子进程,开启计数
      $index=$this->swoole_table->get('index');
      if($index === false){
        $index = 0;
      }else{
        $index = $index['index']+1;
      }
      print_r($index);
    }
    $this->swoole_table->set('index',array('index'=>$index));
    $process = new swoole_process(function(swoole_process $worker)use($index){

      swoole_set_process_name(sprintf('php-ps:%s',$index));
      $task = $this->getTask($index);
      foreach ($task as $v){
        call_user_func_array(array($this,$v['handle']),array($index,$v));
      }
      sleep(20);
    }, false, false);
    $pid=$process->start();

    $this->works[$index]=$pid;
    return $pid;
  }

  public function rebootProcess($ret){
    $pid=$ret['pid'];
    $index=array_search($pid, $this->works);
    if($index!==false){
      $index=intval($index);
      $new_pid=$this->CreateProcess($index);
      echo "rebootProcess: {$index}={$new_pid} Done\n";
      return;
    }
    throw new \Exception('rebootProcess Error: no pid');
  }

  public function processWait(){
    while(1) {
      if(count($this->works)){
        $ret = swoole_process::wait();
        if ($ret) {
          $this->rebootProcess($ret);
        }
      }else{
        break;
      }
    }
  }

}
$process = new Process();

要兑现五个webscoket的次序,首先供给使用协理html5的浏览器

$serv = new swoole_websocket_server("127.0.0.1",3999);
//服务的基本设置
$serv->set(array(
'worker_num' => 2,
'reactor_num'=>8,
'task_worker_num'=>1,
'dispatch_mode' => 2,
'debug_mode'=> 1,
'daemonize' => true,
'log_file' => __DIR__.'/log/webs_swoole.log',
'heartbeat_check_interval' => 60,
'heartbeat_idle_time' => 600,
));
$serv->on('connect', function ($serv,$fd){
// echo "client:$fd Connect.".PHP_EOL;
});
//测试receive
$serv->on("receive",function(swoole_server $serv,$fd,$from_id,$data){
// echo "receive#{$from_id}: receive $data ".PHP_EOL;
});
$serv->on('open', function($server, $req) {
// echo "server#{$server->worker_pid}: handshake success with fd#{$req->fd}".PHP_EOL;;
// echo PHP_EOL;
});
$serv->on('message',function($server,$frame) {
// echo "message: ".$frame->data.PHP_EOL;
$msg=json_decode($frame->data,true);
switch ($msg['type']){
case 'login':
$server->push($frame->fd,"欢迎欢迎~");
break;
default:
break;
}
$msg['fd']=$frame->fd;
$server->task($msg);
});
$serv->on("workerstart",function($server,$workerid){
// echo "workerstart: ".$workerid.PHP_EOL;
// echo PHP_EOL;
});
$serv->on("task","on_task");
$serv->on("finish",function($serv,$task_id,$data){
return ;
});
$serv->on('close', function($server,$fd,$from_id) {
// echo "connection close: ".$fd.PHP_EOL;
// echo PHP_EOL;
});
$serv->start();
function on_task($serv,$task_id,$from_id,$data) {
switch ($data['type']){
case 'login':
$send_msg="说:我来了~";
break;
default:
$send_msg="说:{$data['msg']['speak']}";
break;
}
foreach ($serv->connections as $conn){
if ($conn!=$data['fd']){
if (strpos($data['msg']['name'],"游客")===0){
$name=$data['msg']['name']."_".$data['fd'];
}else{
$name=$data['msg']['name'];
}
}else{
$name="我";
}
$serv->push($conn,$name.$send_msg);
}
return;
}
function on_finish($serv,$task_id,$data){
return true;
}

咱俩采取PHP SWOOLE 做多少个异步的定时职责系统。

此间代码中,使用了swoole_table作为进度间分享的内存,为了分配index。以及当进程退出后,父进程经过wait重新拉起该进程职分。

if(ws === null){
var wsServer = 'ws://'+ location.hostname +':8888';
ws = new WebSocket(wsServer);
ws.onopen = function(){
console.log("socket连接已打开");
};
ws.onmessage = function(e){
console.log("message:" + e.data);
};
ws.onclose = function(){
console.log("socket连接已断开");
};
ws.onerror = function(e){
console.log("ERROR:" + e.data);
};
//离开页面时关闭连接
$(window).bind('beforeunload',function(){
ws.close();
}
);
} 

前面三个代码:

自己MySQL数据库的骨干同步是经过分析Master库中的binary-log来实行联合数据到从库的。可是大家应用PHP来同步数据的时候,那么只好从master库分批查询数据,然后插入到普罗维登斯的slave库来了。

测量试验截图

如此那般就落到实处了三个client,可是工作还远未有甘休。下边包车型客车代码只是简短的进展了连接,对话,关闭等骨干动作。假若想和服务端实行广播发表,供给求有更切实的方案。举例收受message时判别项目进行进一步操作。

<!DOCTYPE html PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN" "http://www.w3.org/TR/html4/loose.dtd">
<html>
<head>
<meta http-equiv="Content-Type" content="text/html; charset=UTF-8">
<title>WebSocket测试</title> 
<script language="javascript"type="text/javascript" src="jquery-1.12.0.min.js"> 
</script>
</head>
<body>
<h2>WebSocket Test</h2> 
昵称:<input type="text" id="name" size="5" value="游客"/> <input type="text" id="content"> 
<button onclick="speak_to_all()">发送</button>
<br/><br/>
<textarea id="message" style="overflow-x:hidden" rows="10" cols="50"></textarea> 
<div id="output"></div>
</body> 
<script language="javascript"type="text/javascript"> 
var wsUri ="ws://127.0.0.1:3999/"; 
var output; 
function init() { 
output = document.getElementById("output"); 
testWebSocket();
}
function testWebSocket() { 
websocket = new WebSocket(wsUri); 
websocket.onopen = function(evt) { 
onOpen(evt) 
}; 
websocket.onclose = function(evt) { 
onClose(evt) 
}; 
websocket.onmessage = function(evt) { 
onMessage(evt) 
}; 
websocket.onerror = function(evt) { 
onError(evt) 
}; 
}
function get_speak_msg(){
var name=document.getElementById("name").value;
var speak=document.getElementById("content").value;
var json_msg='{"name":"'+name+'","speak":\"'+speak+'"}';
return json_msg;
}
function pack_msg(type,msg){
return '{"type":"'+type+'","msg":'+msg+'}';
}
function onOpen(evt) {
append_speak("已经联通服务器.........");
speak_msg=get_speak_msg();
send_msg=pack_msg("login",speak_msg);
doSend(send_msg);
}
function onClose(evt) { 
append_speak("俺老孙去也!");
} 
function onMessage(evt) {
append_speak(evt.data);
}
function onError(evt) {
alert(evt.data);
}
function doSend(message) { 
websocket.send(message);
}
function append_speak(new_msg){
document.getElementById("message").value=document.getElementById("message").value+new_msg+"\n";
document.getElementById('message').scrollTop = document.getElementById('message').scrollHeight;
}
function speak_to_all(){
send_msg=pack_msg("speak",get_speak_msg());
if(document.getElementById("content").value==""){
return;
}
doSend(send_msg);
document.getElementById("content").value="";
}
init();
</script>
</html>

此间我们应用的框架是 ThinkPHP 3.2 .

进程ps

服务端:此处选拔Swoole进行php服务端的websocket开荒,使用swoole进行php的websocket开垦特别轻易,何况它还帮忙httpserver的支持。

推荐介绍作品:php安装swoole扩大的秘籍   
使用swoole扩展php
websocket示例

率先安装PHP扩充:
SWOOLE,因为尚未动用到特别的法力,所以这里我们选拔pecl来飞速安装:

金沙澳门官网7817网址 1

$server = new swoole_websocket_server("0.0.0.0", 8888);
$server->on('open', function (swoole_websocket_server $server, $request) {
echo "server: handshake success with fd{$request->fd}\n";
});
$server->on('message', function (swoole_websocket_server $server, $frame) {
echo "receive from {$frame->fd}:{$frame->data},opcode:{$frame->opcode},fin:{$frame->finish}\n";
$server->push($frame->fd, "this is server");
});
$server->on('close', function ($ser, $fd) {
echo "client {$fd} closed\n";
});
$server->start();

以上代码给大家大饱眼福了PHP+swoole达成轻便多个人在线聊天群发的连锁代码,希望本文所述对我们持有扶助。

pecl install swoole

结果 休眠20s后退出后会被电动拉起

swoole是八个php的扩张,安装形式能够参照这里:php安装swoole扩充的主意

您大概感兴趣的作品:

  • 详解thinkphp5+swoole实现异步邮件群发(SMTP情势)
  • 详解PHP swoole
    process的选取方法
  • Windows 下安装 swoole
    图文化教育程(php)
  • 在PHP
    7下安装Swoole与Yar,Yaf的措施教程
  • 金沙澳门官网7817网址,PHP使用SWOOLE扩大实现按时同步 MySQL
    数据
  • linux平台编写翻译安装PHP7并设置Redis扩大与Swoole扩张实例教程
  • 基于Swoole实现PHP与websocket聊天室
  • PHP的swoole扩展安装格局详细教程
  • 初识PHP中的Swoole
  • 使用php+swoole对client数据实时更新(一)
  • PHP技师学习使用Swoole的理由