/**

* Created by PhpStorm.

* User: qyc

* Date: 2017/10/22

* Time: 下午4:40

*/

define('HOST', '0.0.0.0');

define('PORT', '9502');

define('WORKER_NUM', 8);

define('DISPATCH_MODE', 2);

define('DAEMONIZE', 0);

class swoole_ws

{

private $pdo;

private $server;

//这里写死了数据表,之后完成数据库监听后改写这里逻辑

private $table

= [

'swoole_test',

];

public function __construct()

{

$this->server = new swoole_websocket_server(HOST, PORT);

$this->server->set(

[

//开启woker进程数

'worker_num' => WORKER_NUM,

//请求分发策略,

'dispatch_mode' => DISPATCH_MODE,

//是否守护进程

'daemonize' => DAEMONIZE,

]

);

$this->server->on('message', [$this, 'onMessage']);

$this->server->on('open', [$this, 'onOpen']);

$this->server->on('workerStart', [$this, 'onWorkerStart']);

$this->server->start();

}

//woker进程

public function onWorkerStart(swoole_websocket_server $server, $worker_id)

{

if ($worker_id == 0) {

// 0 worker进程开启一个定时器去监听mysql数据变化

$this->server->tick(500, [$this, 'onTick']);

}

//每个worker进程维持一个pdo连接

$this->pdo = new PDO("mysql:host=localhost;dbname=swoole_ws", "root", "root");

}

//接受客户端数据

public function onMessage(swoole_websocket_server $server, swoole_websocket_frame $frame)

{

// echo $frame->data; //客户端发送的数据

// echo $server->worker_id;

}

//客户端 服务端建立连接并完成握手后的回调

public function onOpen(swoole_websocket_server $server, swoole_http_request $request)

{

//第一次连接去获取一下Mysql数据

$this->update();

}

private function update()

{

$res = [];

foreach ($this->table as $table) {

$res[$table] = $this->getTableData($table);

}

foreach ($this->server->connections as $connection) {

//向所有客户端发送数据

$this->server->push($connection, json_encode($res));

}

}

//获取表数据

private function getTableData($table)

{

$sql = 'select * from ' . $table;

try {

$stmt = $this->pdo->prepare($sql);

$stmt->execute();

$res = $stmt->fetchAll(PDO::FETCH_ASSOC);

return $res ?: [];

} catch (Exception $e) {

return [];

}

}

//定时器回调

public function onTick()

{

/*

* is_update表记录表是否更新过

* swoole_test表添加了3个触发器, after insert、update、delete,

* 会向is_update表更新swoole_test是否有更新操作

*/

try {

$sql = 'select is_update from is_update where table_name = "swoole_test"';

$stmt = $this->pdo->prepare($sql);

$stmt->execute();

$res = $stmt->fetchAll(PDO::FETCH_ASSOC);

if ( ! $res) {

return [];

}

if ($res[0]['is_update'] == 1) {

//当is_update字段为1时表明数据有更新,向客户端推送消息

$this->update();

//更新下表更新字段

$update = 'update is_update set is_update=0 where table_name = "swoole_test"';

$stmt = $this->pdo->prepare($update);

$stmt->execute();

}

} catch (Exception $e) {

$this->pdo = new PDO("mysql:host=localhost;dbname=swoole_ws", "root", "root");

}

}

}

new swoole_ws();

Mysql实时数据

integrity="sha384-BVYiiSIFeK1dGmJRAkycuHAHRg32OmUcww7on3RYdg4Va+PmSTsz/K68vbdEjh4u" crossorigin="anonymous">

crossorigin="anonymous">

swoole_test

var websocket = new WebSocket('ws://127.0.0.1:9502');

websocket.onopen = function (event) {

console.log('websocket connect');

websocket.send('hello swoole');

};

websocket.onclose = function (event) {

console.log('websocket close');

};

websocket.onerror = function (event, e) {

console.log('error occured:' + event.data);

};

websocket.onmessage = function (event) {

data = JSON.parse(event.data)['swoole_test'];

var th = '

';

var width = 1 / json_length(data[0]) * 100;

for (var key in data[0]) {

th += "

" + key + "";

}

th += '

';

$("#thead").html(th);

var tbody = '';

for (var line in data) {

tbody += '

';

var td = '';

for (var column in data[line]) {

td += "

" + data[line][column] + "";

}

tbody += td + '

';

}

$("#tbody").html(tbody);

};

function json_length(json) {

var length = 0;

for (var item in json) {

length++;

}

return length;

}

链接:https://www.jianshu.com/p/9f27a6d7c29a

来源:简书

Logo

永洪科技,致力于打造全球领先的数据技术厂商,具备从数据应用方案咨询、BI、AIGC智能分析、数字孪生、数据资产、数据治理、数据实施的端到端大数据价值服务能力。

更多推荐