最近在学习swoole,自己完成下论坛里留下的作业,通过swoole_websocket实时展示mysql数据,有个遗留问题,如何判断mysql是否有增删改,我想到的方法有:

1、在应用层中有增删改时,发一个消息到消息队列来监听。

2、解析mysql bin-log日志。

3、触发器。

现在的解决办法是利用触发器,例子中我只监听了一个表,当有多个表时,需要添加大量触发器,触发器本身占用很多资源,太多触发器也不好管理,之后会再想办法解析下bin-log日志来监听Mysql数据变化。

服务端代码

/**

* Created by PhpStorm.

* User: qyc

* Date: 2017/10/22

* Time: 下午4:40

*/

define('HOST', '0.0.0.0');

define('PORT', '9502');

define('WORKER_NUM', 8);

define('DISPATCH_MODE', 2);

define('DAEMONIZE', 0);

class swoole_ws

{

private $pdo;

private $server;

//这里写死了数据表,之后完成数据库监听后改写这里逻辑

private $table

= [

'swoole_test',

];

public function __construct()

{

$this->server = new swoole_websocket_server(HOST, PORT);

$this->server->set(

[

//开启woker进程数

'worker_num' => WORKER_NUM,

//请求分发策略,

'dispatch_mode' => DISPATCH_MODE,

//是否守护进程

'daemonize' => DAEMONIZE,

]

);

$this->server->on('message', [$this, 'onMessage']);

$this->server->on('open', [$this, 'onOpen']);

$this->server->on('workerStart', [$this, 'onWorkerStart']);

$this->server->start();

}

//woker进程

public function onWorkerStart(swoole_websocket_server $server, $worker_id)

{

if ($worker_id == 0) {

// 0 worker进程开启一个定时器去监听mysql数据变化

$this->server->tick(500, [$this, 'onTick']);

}

//每个worker进程维持一个pdo连接

$this->pdo = new PDO("mysql:host=localhost;dbname=swoole_ws", "root", "root");

}

//接受客户端数据

public function onMessage(swoole_websocket_server $server, swoole_websocket_frame $frame)

{

// echo $frame->data; //客户端发送的数据

// echo $server->worker_id;

}

//客户端 服务端建立连接并完成握手后的回调

public function onOpen(swoole_websocket_server $server, swoole_http_request $request)

{

//第一次连接去获取一下Mysql数据

$this->update();

}

private function update()

{

$res = [];

foreach ($this->table as $table) {

$res[$table] = $this->getTableData($table);

}

foreach ($this->server->connections as $connection) {

//向所有客户端发送数据

$this->server->push($connection, json_encode($res));

}

}

//获取表数据

private function getTableData($table)

{

$sql = 'select * from ' . $table;

try {

$stmt = $this->pdo->prepare($sql);

$stmt->execute();

$res = $stmt->fetchAll(PDO::FETCH_ASSOC);

return $res ?: [];

} catch (Exception $e) {

return [];

}

}

//定时器回调

public function onTick()

{

/*

* is_update表记录表是否更新过

* swoole_test表添加了3个触发器, after insert、update、delete,

* 会向is_update表更新swoole_test是否有更新操作

*/

try {

$sql = 'select is_update from is_update where table_name = "swoole_test"';

$stmt = $this->pdo->prepare($sql);

$stmt->execute();

$res = $stmt->fetchAll(PDO::FETCH_ASSOC);

if ( ! $res) {

return [];

}

if ($res[0]['is_update'] == 1) {

//当is_update字段为1时表明数据有更新,向客户端推送消息

$this->update();

//更新下表更新字段

$update = 'update is_update set is_update=0 where table_name = "swoole_test"';

$stmt = $this->pdo->prepare($update);

$stmt->execute();

}

} catch (Exception $e) {

$this->pdo = new PDO("mysql:host=localhost;dbname=swoole_ws", "root", "root");

}

}

}

new swoole_ws();

客户端代码

Mysql实时数据

integrity="sha384-BVYiiSIFeK1dGmJRAkycuHAHRg32OmUcww7on3RYdg4Va+PmSTsz/K68vbdEjh4u" crossorigin="anonymous">

crossorigin="anonymous">

swoole_test

var websocket = new WebSocket('ws://127.0.0.1:9502');

websocket.onopen = function (event) {

console.log('websocket connect');

websocket.send('hello swoole');

};

websocket.onclose = function (event) {

console.log('websocket close');

};

websocket.onerror = function (event, e) {

console.log('error occured:' + event.data);

};

websocket.onmessage = function (event) {

data = JSON.parse(event.data)['swoole_test'];

var th = '

';

var width = 1 / json_length(data[0]) * 100;

for (var key in data[0]) {

th += "

" + key + "";

}

th += '

';

$("#thead").html(th);

var tbody = '';

for (var line in data) {

tbody += '

';

var td = '';

for (var column in data[line]) {

td += "

" + data[line][column] + "";

}

tbody += td + '

';

}

$("#tbody").html(tbody);

};

function json_length(json) {

var length = 0;

for (var item in json) {

length++;

}

return length;

}

触发器

DELIMITER $$

/*更新触发器*/

DROP TRIGGER IF EXISTS swoole_test_is_update;

$$

CREATE TRIGGER swoole_test_is_update AFTER UPDATE ON swoole_test FOR EACH ROW

BEGIN

UPDATE `is_update`

SET is_update = 1

WHERE TABLE_NAME = 'swoole_test';

END;

$$

/*添加触发器*/

DROP TRIGGER IF EXISTS swoole_test_is_insert;

$$

CREATE TRIGGER swoole_test_is_insert AFTER UPDATE ON swoole_test FOR EACH ROW

BEGIN

UPDATE `is_update`

SET is_update = 1

WHERE TABLE_NAME = 'swoole_test';

END;

$$

/*删除触发器*/

DROP TRIGGER IF EXISTS swoole_test_is_delete;

$$

CREATE TRIGGER swoole_test_is_delete AFTER UPDATE ON swoole_test FOR EACH ROW

BEGIN

UPDATE `is_update`

SET is_update = 1

WHERE TABLE_NAME = 'swoole_test';

END;

$$

DELIMITER ;

开启websocker服务端

php swoole_ws;

运行效果

现在访问前端页面,对数据库进行增删改操作测试效果。

9f27a6d7c29a?from=groupmessage

1.png

结语

这只是个swoole_websocket练习,熟悉一下,大家可以看看Swoole官方文档进行学习,会有不错的提升,之后会做个即时聊天工具玩玩,Just code for fun ~

Logo

永洪科技,致力于打造全球领先的数据技术厂商,具备从数据应用方案咨询、BI、AIGC智能分析、数字孪生、数据资产、数据治理、数据实施的端到端大数据价值服务能力。

更多推荐