PHP RabbitMQ 教程(四) - 路由

路由

(使用php-amqplib) 在上一节中,我们创建了一个简单的日志系统(logging system)。我们已经可以广播日志消息到多个接收者了。

在本节中,我们要给它增加一个功能-使它能够只订阅消息的一个子集。比如,只把严重的错误信息写入到日志文件(存储到磁盘)中,但同时仍然会把所有日志信息输出到控制台中。

绑定

在上一节中我们已经创建了绑定(bindings),代码如下:

$channel->queue_bind($queue_name,'logs');

绑定(bindings)是指交换器(exchange)和队列(queue)的关系。可以简单的理解为:这个队列对这个交换器中的消息感兴趣。

绑定的时候可以带一个额外的 routing_key 参数。为了避免与$channel::basic_publish的参数混淆,我们把它叫做 binding_key,所以我们这样使用key创建一个绑定:

$binding_key = 'black';
$channel->queue_bind($queue_name,$exchange_name,$binding_key);

binding key的意义取决于交换器的类型。我们之前使用过的fanout类型的交换器,会忽略这个值。

Direct 交换器

之前创建的日志系统分发所有消息到所有的消费者。我们打算扩展一下,使它可以过滤严重的消息。比如,我们只想在接收到严重错误的时候才写入到磁盘中,不在警告或普通的消息上浪费磁盘空间。

我们使用的是没有太多扩展性的fanout交换器,它仅能够简单的广播消息。

我们将要使用一个direct交换器代替fanout交换器。路由算法很简单-只有binding key完全匹配routing key的消息会进入队列。

为了说明,考虑如下的场景:

在这个场景中,我们可以看到direct类型的交换器X有两个队列,第一个队列使用orange作为binding key,第二个队列有两个绑定,一个是black另一个是green。

在这个场景中,当routing key为orange的消息发送到交换器,将会被路由到队列Q1。routing key为black或green的消息将会发送到Q2。其他的消息则会被丢弃。

多个绑定

使用相同的binding key绑定多个队列是合法的。在这个例子中,我们会使用black作为binding key为X和Q1之间添加一个绑定。这样一来,direct 交换器就表现得跟fanout交换器一样,分发消息到匹配的队列。routing key为black的消息就会被分发到Q1和Q2。

发送日志

我们将要对日志系统使用这个模型,我们将要发送消息到一个direct交换器。将日志级别作为routing key。这样一来接收端程序就可以选择它想要接收的消息了。首先来看看发送日志。

和以往一样,需要创建一个交换器:

$channel->exchange_declare('direct_logs','direct',false,false,false);

然后准备发送消息:

$channel->exchange_declare('direct_logs','direct',false,false,false);
$channel->basic_publish($msg,'direct_logs',$severity);

为了简化,我们可以假定’severity’的值可以是’info’,‘warning’,‘error’中的一个。

订阅

接收消息的脚本会跟之前一样正常工作,但是我们准备为每一个我们感兴趣的日志级别创建一个新的绑定。

foreach($severities as $severity){
	$channel->queue_bind($queue_name,'direct_logs',$severity);
}

整合

emit_log_direct.php类的代码为:

<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

$connection = new AMQPStremConnection('localhost',5672,'guest','guest');
$channel = $connection->channel();

$channel->exchange_declare('direct_logs','direct',false,false,false);

$severity = isset($argv[1]) && !empty($argv[1]) ? $argv[1] : 'info';

$data = implode(' ',array_slice($argv,2));
if(empty($data)) $data = “Hello World!”;

$msg = “[x] Sent ”,$severity,':',$data,” \n”;

$channel->close();
$connection->close();
?>

receive_logs_direct.php的代码为:

<?php

require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;

$connection = new AMQPStreamConnection('localhost',5672,'guest','guest');
$channel = $connection->channel();

$channel->exchange_declare('direct_logs','direct',false,false,false);

list($queue_name, ,) = $channel->queue_declare(“”,false,false,true,false);

$severities = array_slice($argv,1);
if(empty($severities)){
	file_put_contents('php://stderr',”Usage:$argv[0][info][warning][error]\n”);
	exit(1);
}

foreach($severities as $severity){
	$channel->queue_bind($queue_name,'direct_logs',$severity);
}

echo '[*]Waiting for logs.To exit press CTRL+C',”\n”;

$callback = function($msg){
	echo '[*]',$msg->delivery_info['routing_key'],':',$msg->body,”\n”;	
};

$channel->basic_consume($queue_name,'',false,true,false,false,$callback);

while(count($channel->callbacks)){
	$channel->wait();
}

$channel->close();
$connection->close();
?>

如果你想只保存’warning’或’error’(而不是’info’)级别的消息,只需要打开命令行输入:

php receive_logs_direct.php warning error > logs_from_rabbit.log

如果你想在屏幕上输出所有的消息,打开一个新的终端,输入:

php receive_logs_direct.php info warning error
[*]Waiting for logs.To exit press CTRL+C

例如,发送error消息,输入:

php emit_log_direct.php error "Run. Run. Or it will explode."
[x] Sent 'error':'Run. Run. Or it will explode.'

emit_log_direct.php源码 receive_logs_direct.php源码

转到第五节,查看如何监听基于模式的消息。

原文地址:Routing

PHP RabbitMQ 教程(三) - 发布/订阅

发布/订阅

我们在上一节创建了一个工作队列,并假定队列对应的任务传送给了某个客户端。在这一章节我们会做一些完全不一样的东西–我们会发送一条消息到多个消费者,也称之为“发布/订阅”模式。

为了说明这个模式,我们会创建一个简单的日志系统(logging system,以下简称日志系统),它由两个程序组成–第一个是发送日志信息,第二个是接收日志并打印。

日志系统的每一个运行的接收端程序都会接收信息,这样就可以运行一个接收端就把日志保存到硬盘里,同时运行另一个接收端去实时显示日志到屏幕。

本质上,日志内容是广播给所有的接收端的。

交换器

在之前的章节中我们从一个队列里发送和接收消息,现在该把完整的RabbitMQ消息模型介绍给大家了。

让我们快速的回看一遍在之前的章节中的内容:

>生产者是一个用来发送消息的程序

>队列是一个存储消息的缓冲区

>消费者是一个接收消息的程序

RabbitMQ消息模型的核心思想是,生产者永远不会直接发送给任何消息队列,实际上,生产者一般情况下甚至不知道消息应该发送给哪个队列。

生产者只能发送消息到交换器中,交换器非常简单。一方面从生产者接收消息,另一方面把消息推送到队列中。交换器必须知道如何处理接收到的消息,是推送到某个队列?推送到多个队列?还是丢弃这条消息。这个规则通过交换器类型(exchange type)来指定。

这里是交换器的几个类型:direct,topic,headers,fanout。这里我们主要关注最后一个–fanout,创建一个类型为 fanout 的交换器,命名为 logs。

$channel->exchange_declare('logs','fanout',false,false,false);

fanout交换器非常简单,你可以从名称中猜出它的功能,它把所有接收到的消息广播给所有它知道的队列,这也正是我们的日志系统需要的功能。

列出交换器

可以使用rabbitmqctl 命令列出服务器上的所有交换器:

sudo rabbitmqctl list_exchanges

Listing exchanges ...
        direct
amq.direct      direct
amq.fanout      fanout
amq.headers     headers
amq.match       headers
amq.rabbitmq.log        topic
amq.rabbitmq.trace      topic
amq.topic       topic
logs    fanout
...done.

结果中有一些amq.*和一些未命名的交换器,这是一些默认创建的交换器,它们不太可能是现在需要用到的。

未命名交换器

在之前的章节中我们对交换器一无所知,直到可以发送消息给队列。大概是因为我们当时正在使用一个以空字符串“”定义的默认的交换器。

回想一下之前怎么发布消息:

$channel->basic_publish($msg,'','hello');

这里就是使用默认或者说未命名的交换器:消息被routing_key的值 Here we use the default or nameless exchange: messages are routed to the queue with the name specified by routing_key, if it exists. The routing key is the second argument to basic_publish

现在,可以发布消息到这个队列。

$channel->exchange_declare('logs','fanout',false,false,false);
$channel->basic_publish($msg,'logs');

临时队列

也许你还记得在之前我们使用了一个指定的队列(还记得 hello 队列 和 task_queue 队列吗?)。可以命名一个队列是至关重要的–我们需要指定一个worker到同一个队列。当想让生产者和消费者使用同一个队列时给队列命名是非常重要的。

但是在我们的日志系统中情况不同了,我们想要接收所有的消息,不仅仅是其中的一部分,我们关心的是最新的消息而不是旧的,因此需要做两件事。

首先,当连接到RabbitMQ时,需要一个空的队列,可以手动创建一个名字随机的队列,或者,更好的办法是,让服务器为我们随机选一个队列名字。

其次,一旦与消费者失去连接,队列需要自动删除。

php-amqplib中,当我们创建了一个名字为空的队列时,实际上是创建了一个被生成了名字的非持久化的队列。

list($queue_name, ,) = $channel->queue_declare("");

方法执行后,$queue_name变量包含了一个RabbitMQ生成的字符串。比如也许是这样的:amq.gen-JzTY20BRgKO-HjmUJj0wLg。

当连接被关闭的时候,队列也会被删掉,因为队列是独有的。

绑定(Bindlings)

我们已经创建了一个fanout类型的交换器和一个队列。现在需要让交换器发送消息给队列。交换器和队列之间的关系称之为绑定(binding)

$channel->queue_bind($queue_name,'logs');

现在开始,logs 交换器会把消息附加到队列中。

列出绑定(Listing bindings)

可以使用 rabbitmqctl list_bindings列出所有存在的正在使用的绑定。

整合

发送日志消息的生产者,与之前的代码看起来没什么不同,最重要的变化是现在想要发送消息到我们的 logs 交换器中,需要在发送时提供一个routing_key,但是在 fanout类型的交换器中这个值是可以忽略的。下边是emit_log.php的代码。

<?php

require_once __DIR__ .'/verdor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

$connection = new AMQPStreamConnection('localhost',5672,'guest','guest');
$channel = $channel->channel();

$channel->exchange_declare('logs','fanout',false,false,false);

$data = implode(' ',array_slice($argv,1));

if(empty($data)) $data = "info:Hello World";
$msg = new AMQPMessage($data);

$channel->basic_publish($msg,'logs');

echo "[x]Sent ",$data,"\n";

$channel->close();
$connection->close();
?>

(emit_log.php)

如你所见,建立连接后声明了交换器,这一步是必须的,因为发送消息到一个不存在的交换器是被禁止的。

如果还没有队列绑定到交换器,信息会丢失,但是这对于我们是可以的,如果没有消费者监听,我们可以安全的丢弃消息。

receive_logs.php:

<?php

require_once __DIR__ .'/vendor/autoload.php';
use PhpAmqpLib\Connection\QMAPStreamConnection;

$connection = new AMQPStreamConnection('localhost',5672,'guest','guest');
$channel = $connection->channel();

$channel->queue_bind($queue_name,'logs');

echo '[*] Waiting for logs. To exit press CTRL+C',"\n";

$callback = function($msg){
	echo '[x]',$msg->body,"\n";
};

$channel->basic_consume($queue_name,'',false,true,false,false,$callback);

whild(count($channel->callbacks)){
	$channel->wait();
}

$channel->close();
$connection->close();
?>

(receive_logs.php)

如果想保存日志到文件中,可以在命令中输入

php receive_logs.php > logs_from_rabbit.log

如果想在屏幕上查看日志,新打开一个终端并运行:

php receive_logs.php

发送日志:

php emit_log.php

使用 rabbitmqctl list_bindings 可以确认代码确实创建了绑定和队列,当两个receive_logs.php在运行的时候会看到类似这样的:

sudo rabbitmqctl list_bindings
Listing bindings ...
logs    exchange        amq.gen-JzTY20BRgKO-HjmUJj0wLg  queue           []
logs    exchange        amq.gen-vso0PVvyiRIL2WoV3i48Yg  queue           []
...done.

对于结果的解释很简单,logs交换器中的数据发送到两个服务器指定的队列,而这正是我们要实现的。

想要弄明白怎样去监听部分消息,转到第四部分。

原文地址:Publish/Subscribe

PHP RabbitMQ 教程(二) - 工作队列

工作队列

(使用php-amqplib库)

在本教程第一部分 我们已经写完了从一个指定队列发送和接收消息的程序。在这一章节中,我们会创建一个工作队列(Work Queue)来分发耗时的任务给多个工作者(worker)。

工作队列(也被称为 任务队列-task queue)主要是避免立即执行资源密集型任务并且还要等待它执行完毕。相反,需要让任务稍后执行,我们把一个任务当做一条信息发送给队列,后台运行的工作者(worker)会取出任务并执行,当运行多个worker时任务会在它们之间共享。

这个概念在web应用中非常有用,可以在短暂的HTTP请求期间处理一些复杂的任务。

准备工作

在前面的部分我们发送了一条内容为“Hello World”的信息,现在我们会发送一些字符串,把这些字符串当做复杂的任务,我们并没有一个实际的任务,像是图片缩放,或者转换PDF文件,所以我们使用sleep方法来假设任务很繁忙。我们会在字符串中加入一些“.”来表示复杂复杂程度;每一个“.”表示需要耗时1秒,比如,“Hello …”代表需要耗时3秒。

我们从上一节的基础上稍微改动了一下send.php,来允许消息可以从命令行发送,这个程序会发送任务到队列中,把它命名为new_task.php

$data = impllode(' ',array_slice($argv,1));
if(empty($data))$data = "Hello World";
$msg = new AMQPMessage($data,
	array('delivery_mode'=>2)#设置消息持久化,下边会讲到。
);
$channel->basic_publish($msg,'','task_queue');
echo "[x] Sent ",$data,"\n";

上一节的receive.php也需要一些改动:需要为消息中的每一个“.”模拟1秒的工作。它会从队列中取出消息并运行,把它命名为worker.php:

$callback = function($msg){
	echo "[x] Received ",$msg->body,"\n";
	sleep(substr_count($msg->body,'.'));
	echo "[x] Done","\n";
	$msg->delivery_info['channel]->basic_ack($msg->delivery_info['delivery_tag']);
};

$channel->basic_gos(null,1,null);
$channel->basic_consume('task_queue','',false,false,false,false,$callback);

注意我们伪造的任务需要花费时间(即发送的字符串中要有一些”.“)

然后运行:

php new_task.php "A very hard task which takes two seconds.."
php wordker.php

轮询分发

使用工作队列的一个好处就是它能够并行的处理队列。如果有太多工作需要处理,只需要添加新的worker就可以了。

首先,我们试着同时运行两个worker.php,它们都会从队列接收到消息,但是到底是不是这样呢?我们看一下。

此时需要打开3个终端,其中两个运行worker.php,这两个就是我们的消费者 - C1和C2。

shell1

php worker.php
[*] Waiting for messages. To exit press CTRL+C

shell2

php worker.php
[*] Waiting for messages. To exit press CTRL+C

在第三个终端中我们会发送新的任务,消费者程序开始运行后就可以发送一些消息了。

shell3

php new_task.php First message.
php new_task.php Second message..
php new_task.php Third message...
php new_task.php Fourth message....
php new_task.php Fifth message.....

我们看一下发送给worker的是什么:

shell1

php worker.php
[*] Waiting for messages.To exit press CTRL+C
[x]Received 'First message.'
[x]Received 'Third message...'
[x]Received 'Fifth message.....'

shell2

php worker.php
[*] Waiting for messages.To exit press CTRL+C
[x]Received 'Second message.'
[x]Received 'Fourth message...'

RabbitMQ会默认按顺序把消息发送给下一个消费者,平均每个消费者都会得到一样多数量的消息,这种分发消息的方式叫做轮询。试着添加三个或更多个worker来运行。

消息响应

执行一个任务会消耗一定的时间,也许你想知道如果一个消费者在执行一个耗时较长的任务时但是在执行一部分的时候挂掉会发生什么。在我们当前的代码中,一旦RabbitMQ把消息分发给消费者便会立即从内存中移除。这种情况下,如果停止一个worker,它正在处理的消息就会丢失。同时其他所有发送给这个worker的还没有处理的消息也会丢失。

但是我们不想丢失任何任务,如果一个worker挂掉,需要把任务发送到另一个worker。

为了确保消息永不丢失,RabbitMQ支持消息响应(message acknowledgements),消费者会发送一个响应告诉RabbitMQ已经收到了某条消息,并且已经处理,这样RabbitMQ就可以删掉它了。

如果一个消费者程序在未发送响应之前挂掉了(频道关闭,链接关闭,或者TCP连接丢失),RabbitMQ会认为消息没有完全处理然后会重新推送到队列中。如果此时有其他的消费者程序在运行,RabbitMQ会很快把消息发送给另一个消费者。这样就可以确保消息不会丢失,即使worker偶尔挂掉。

消息是没有超时的概念的,当worker断开连接的时候,RabbitMQ会重新发送消息,这样在处理一个耗时较长的消息任务时就不会出现问题了。

消息响应默认是关闭的。可以通过设置basic_consume的第四个参数为false(true表示不开启应答),然后在处理完任务的时候从worker发送一个正确的响应内容。

$callback = function($msg){
	echo "[x] Received ",$msg->body,"\n";
	sleep(substr_count($msg->body,'.'));
	echo "[x] Done","\n";
	$msg->delivery_info['channel]->basic_ack($msg->delivery_info['delivery_tag]);
};
$channel->basic_consume('task_queue','',false,false,false,false,$callback);

这样我们就可以确保当你CTRL+C杀掉一个正在处理消息的worker的时候,消息并不会丢失。在这个worker挂掉之后,所有未响应的消息就会发送。

忘了响应

一个很容易犯的错误就是忘了basic_ack,后果很严重。消息会在程序退出后重新发送(可能看起来像是随机返还 原文:which may look like random redelivery),但是如果它不释放未响应的消息,RabbitMQ就会占用越来越多的内存。

为了排除这种错误可以使用rabbitmqctl来打印messages_unacknowledges字段:

sudo rabbitmqctl list_queues name messages_ready messages_unacknowledged

Listing queues ...
hello    0       0
...done.

消息持久化

我们已经学习了确保即使消费者程序挂掉,任务也不会丢失。但是任务还是会在RabbitMQ服务停止的时候丢失。

当RabbitMQ退出或崩溃,它会丢失之前所有的队列和消息,除非你特意告诉它。所以我们必须把队列和消息设为持久化。

首先,为了队列不丢失,需要把它声明为持久化(durable),所以修改queue_declare的第三个参数为true:

$channel->queue_declare('hello',false,true,false,false);

尽管这行代码本身是正确的,但是仍然不会正确运行。因为在之前已经定义过一个非持久化的 hello 队列。RabbitMQ不允许使用不同参数重新定义一个已经存在的队列,它会返回一个错误。但是可以用一个快捷的方法去解决,定义一个不同名字的队列,比如 task_queue:

$channel->queue_declare('task_queue',false,true,false,false);

需要把生产者和消费者程序都设置为 true。

这时候,我们就可以确保在RabbitMQ重启之后task_queue队列不会丢失。现在需要设置消息持久化了 - 通过设置AMQPMessage的属性数组中消息属性 delivery_mode = 2来达到。

$msg = new AMQPMessage($data,
		array('delivery_mode'=>2) //设置消息持久化
	);

关于消息持久化的说明

设置消息持久化并不能完全保证消息不会丢失。这只是告诉让RabbitMQ要把消息保存到硬盘,但是从RabbitMQ接收到消息到保存完成仍然还有一个短暂的间隔时间。因为RabbitMQ并不是每一条消息都会使用fsync(2),可能只是保存到缓存中而不是真正的写到磁盘里。并不能保证消息真正的持久化,但是对于简单的工作队列已经足够了。如果你需要更健壮的持久化,可以使用publisher confirms机制。

公平分发

也许你注意到它仍没有像我们想的那样去派发任务,比如在两个worker的情况下,处理奇数消息的比较繁忙,处理偶数消息的比较轻松,一个worker不断的忙碌而另一个几乎不需要工作,但是RabbitmQ并不知道这些,并且继续一如既往的派发消息。

这是因为RabbitMQ在消息进入队列的时候只管去派发,并不管消费者未做出响应的消息数。它只是把每第n条消息发送给第n个消费者。

我们可以使用basic_qos方法,并设置prefetch_count = 1。这样是告诉RabbitMQ在同一时刻不要发送超过1条消息给一个worker,或者说,不要发送新的消息给worker直到它已处理完上一条消息并作出了响应。这样,它就会把消息发送给下一个空闲的worker了。

$channel->basic_qos(null,1,null);

注意队列长度

如果所有的worker都处于忙碌状态,队列就会填满,你需要留意,添加更多的worker,或者使用其他的策略。

整合

最终,new_task.php的代码如下:

<?php

require_once __DIR__ .'/verdor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

$connection = new AMQPStreamConnection('localhost',5672,'guest','guest');
$channel = $connection->channel();

$channel->queue_declare('task_queue',false,true,false,false);

$data = implode(' ',array_slice($argv,1));

if(empty($data)) $data = "Hello World!";

$msg = new AMQPMessage($data,
	array('delivery_mode'=>2) // 消息持久化
);
$channel->basic_publish($msg,'','task_queue');

echo "[x]Sent ", $data, "\n";

$channel->close();
$connection->close();
?>

new_task.php源码

worker.php

<?php

require_once __DIR__ .'/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;

$connection = new AMQPStreamConnection('localhost',5672,'guest','guest');
$channel = $connection->channel();

$channel->queue_declare('task_queue',false,true,false,false);

echo '[*] Waiting for messages.To exit press CTRL+C',"\n";

$callback = function($msg){
	echo "[x]Received ",$msg->body,"\n";
	sleep(substr_count($msg->body,'.'));
	echo "[x]Done","\n";
	$msg->delivery_info['chennel']->basic_ack($msg->delivery_info['delivery_tag']);
};

$channel->basic_qos(null,1,null);
$channel->basic_consume('task_queue','',false,false,false,false,$callback);

while(count($channel->callbacks)){
	$channel->wait();
}

$channel->close();
$connection->close();
?>

worker.php

使用消息应答和prefetch_count=1后,就可以运行一个工作队列了,持久模式选项会在即使RabbitMQ重启的情况下保留任务。

现在我们可以继续学习第三部分的内容,学习如何发送相同的消息给多个消费者。

原文地址:Work queues

PHP RabbitMQ 教程(一) - 介绍

准备工作

先决条件

本教程先决条件是RabbitMQ已经安装并正在以5672端口运行在 localhost,如果你使用了不同的域,端口,用户,密码,连接配置需要适当改变。

获得帮助

如果在本教程中遇到问题,可以通过邮件列表进行联系。

介绍

RabbitMQ是一个消息代理,它的本质是,从producers(生产者)接收消息,然后发送给consumers(消费者),在这个过程中,可以根据自己的配置规则使用路由,缓冲区,保存消息。

通常的,RabbitMQ,信息传送(messaging),使用一些专业术语。(RabbitMQ, and messaging in general, uses some jargon.)

>生产(Producing)仅仅意味着发送,发送信息的程序叫做生产者(producers),以下图表示:

>队列就是一个信箱的名字,存在于RabbitMQ内部,虽然消息在RabbitMQ和你的应用之间传输,但是只能存在于队列里,队列没有大小限制,它可以存储尽可能多的消息,本质上它是一个无限大的缓冲区,多个producers(生产者)可以通过一个队列发送消息,多个consumers(消费者)也可以尝试从一个队列接收消息,队列以下图表示,队列的名字在图的上边:

>consumers(消费者)的意思与接收相似,消费者主要是等待接收消息的程序,以下图表示:

需要注意的是,生产者,消费者,和代理,不需要一定在一台机器上,事实上在大多数情况下他们确实不在一台机器上。

“Hello World”

(使用php-amqplib库)

在这一部分,我们使用PHP写两段程序,一个生产者发送一条消息,一个消费者接收消息并打印出来。我们会忽略一些php-amqplib API的细节,从简单的事情开始学习,这是一段内容为“Hello World”的消息。

在下边的示意图中,“P”是生产者,“C”是消费者,中间的盒子是队列 — 一个RabbitMQ代表消费者的消息缓冲区。

php-amqplib库

RabbitMQ支持很多协议,本教程包含AMQP 0-9-1,一个开放,通用信息协议,RabbitMQ支持Python、Ruby、.NET、Java、JMS、C、PHP、ActionScript、XMPP、STOMP等多种语言(详见这里),在本教程中我们使用php-amqplib,使用Composer 管理依赖。

添加一个composer.json文件到你的项目目录。

{
    "require": {
        "php-amqplib/php-amqplib": "2.5.*"
    }
}

如果你已经安装了 Composer ,可以运行如下的代码:

composer.phar install

这是一个Windows系统下的Composer安装文件。

现在我们已经安装了php-amqplib,可以写程序了。

发送

新建一个send.php作为发送端,receive.php作为接收端,发送端会连接RabbitMQ,发送一条信息,然后退出。

在send.php中,需要引用php-amqplib库,和使用其中的一些必要的类。

require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

接下来,建立到RabbitMQ服务器的连接:

$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();

这里我们使用socket进行连接,处理协议和鉴定,这样就已经连接到了本机的代理,如果想要连接不同的主机,只要更改localhost为该主机的名称或IP地址即可。

下一步,建立频道,大部分API的工作都在这完成。

要想发送信息,需要声明一个队列,之后可以向这个队列里发布消息。

$channel->queue_declare('hello', false, false, false, false);

$msg = new AMQPMessage('Hello World!');
$channel->basic_publish($msg, '', 'hello');

echo " [x] Sent 'Hello World!'\n";

声明队列是幂等的 — 它仅在不存在的时候才会被创建,如果存在也不会受影响。消息内容是一个字节数组(byte array),所以可以发送任何内容。

最后,关闭频道和连接。

$channel->close();
$connection->close();

这是send.php类的完整内容。

发送失败

如果这是第一次使用RabbitMQ,并且没有看到“Sent”信息(即“ [x] Sent ‘Hello World!”),也许你抓耳挠腮的想知道为什么出错了,也许是代理没有足够的硬盘空间(默认情况下需要至少1G的空间)导致拒绝接收信息。检查日志文件,有必要的花调低限值。这个配置文件文档将会展示给你如何设置disk_free_limit。

接收

收件人,与发送者只发送一条消息不同,接收者会一直运行以监听信息并输出。

receive.php中与send.php中的 include和use 部分的代码一样。

require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;

设置连接与send.php一样,打开连接和频道,命名一个队列,需要注意的是,队列名需要与send.php所发布的队列的名字一致。

$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();

$channel->queue_declare('hello', false, false, false, false);

echo ' [*] Waiting for messages. To exit press CTRL+C', "\n";

注意,我们在此声明了一个队列,因为有可能会在send程序开启前先开启receive程序,我们想要确保在试着接收消息之前队列就已经存在了。

下一步,告诉服务器去从队列传送消息,我们会定义一个用于从服务器接收消息的函数,记住,消息会异步的从服务器发送到客户端。

$callback = function($msg) {
  echo " [x] Received ", $msg->body, "\n";
};

$channel->basic_consume('hello', '', false, true, false, false, $callback);

while(count($channel->callbacks)) {
    $channel->wait();
}

此处使用while方法,当收到消息时,会把收到的消息传入到$callback方法里。

这是receive.php类的全部内容。

现在我们可以运行两段脚本了,在命令行里,执行sender程序。

php send.php

然后,执行receiver程序

php receive.php

receiver程序会把通过sender程序发送的内容打印出来,receiver程序会一直运行,监听新消息(使用ctrl+c停止),所以试着运行sender程序从另一个命令行。

如果想查看队列,可以运行rabbitmqctl list_queues。

Hello World!

查看第二部分,建立一个简单的队列。

原文地址:“Hello World!”

一段有意思的JS For循环代码

var a =0;
for (var i =0,j =0; i <10,j<6;i++,j++) {
	a = i+j;
	console.log("i="+i);
	console.log("j="+j);
	console.log("a="+a);
}

朋友面试,遇到的面试题,电话跟我说完就回来写下来试了一下,结果与自己预想的一样。

没写之前,电话里说的思路是,因为 j 限制了 小于 6,那么应该只循环6次,实际上也确实是这样。

所以最终结果,a = 10。

但是想不明白为什么会有这样的面试题,或者说,什么样的情况会需要写这样的代码呢?

MySQL实现row_number第二部分

接上篇:MySQL实现row_number

为分组增加row number

row_number的分析函数怎么样?(原文:How about row_number “over partition by” functionality? ),比如,如果想为每一个组增加row number,并且在每一个新的分组重置它。

查看下图中payments表。

SELECT
    customerNumber, paymentDate, amount
FROM
    payments
ORDER BY customerNumber;

设想下,为每一条customer数据增加一个当前行的编号,并且每当customer的number字段变化的时候行编号都被重置。

为了实现这个,需要使用两个临时变量,一个作为当前行编号,另一个用来存 上一条customer number与当前行的number进行对比,查询语句如下。(这句有点绕,可能翻译的不太准确,请以原文为准。To achieve this, you have to use two session variables, one for the row number and the other for storing the old customer number to compare it with the current one as the following query:)

SELECT 
    @row_number:=CASE
        WHEN @customer_no = customerNumber THEN @row_number + 1
        ELSE 1
    END AS num,
    @customer_no:=customerNumber as CustomerNumber,
    paymentDate,
    amount
FROM
    payments
ORDER BY customerNumber;

在这段查询代码中,我们使用了CASE声明,如果customer的number不变,就给row_number变量+1,否则,重置为1,结果如下:

与为每一行记录生成row_number一样,也可以使用派生表(derived table)和交叉连接(cross join)生成同样的结果。

SELECT 
    @row_number:=CASE
        WHEN @customer_no = customerNumber THEN @row_number + 1
        ELSE 1
    END AS num,
    @customer_no:=customerNumber as CustomerNumber,
    paymentDate,
    amount
FROM
    payments,(SELECT @customer_no:=0,@row_number:=0) as t
ORDER BY customerNumber;

本教程中,我们展示了如何在MySQL中模仿row_number方法。

原文地址:MySQL row_number, This Is How You Emulate It

MySQL实现row_number

在本教程中,我们将在MySQL中实现一个非常实用的row_number功能。

row_number是一个返回数据排序编号的排名方法,从1开始。我们经常需要用到row_number去生成某些报表,不幸的是,MySQL并不像MSSQL,Oracle一样支持这个方法。在MySQL中如果想实现这个功能需要临时变量。

为了在MySQL中实现row_number,需要在查询中实用临时变量,下图中从employees表中查询出5条数据,并且从1开始,为每一行添加了编号(row number)。

SET @row_number = 0;
 
SELECT 
    (@row_number:=@row_number + 1) AS num, firstName, lastName
FROM
    employees
LIMIT 5;

在上面的查询中:

首先,我们定义了一个变量叫做row_number,初始值为0,row_number是以@为前缀的一个临时变量。

然后,在查询中,我们为这个变量每次+1,LIMIT分句是为了限制返回的结果数,此处设为5.

另一个方法是使用临时变量作为派生表,联合主表。看下边的查询语句:

SELECT 
    (@row_number:=@row_number + 1) AS num, firstName, lastName
FROM
    employees,(SELECT @row_number:=0) AS t
LIMIT 5;

需要注意,派生表必须使用别名,以使查询语句在语法上没有问题。

由于时间关系(回家过年),暂时到这里,文中部分内容可能翻译的有点不太顺,可以直接看下原文,原文中还有一部分讲分组查询中实用row_number。

这个之后再写。

原文地址:MySQL row_number, This Is How You Emulate It

ECharts饼图示例

今天写一个做地区分布的饼图的东西。

由于数据库里本身没有存储地区数据,只有IP,所以也用到了高春辉老师目前的项目里提供的IP地址数据库。可以在数据库下载页面下载到免费的数据库文件和一个PHP处理类。

还用到了百度团队开源的ECharts,很简单很好用。

其实事情本身并没有难度。

约定

默认为:此时已有一个静态页面,并且已引入echarts的js文件。

先读取数据


	$sql = "SELECT id,ips FROM table order by id desc limit 100000";
	
	$pdo = new PDO('mysql:host=127.0.0.1;dbname=db1','root','root');
	
	$query = $pdo -> query($sql);
	$query->setFetchMode(PDO::FETCH_ASSOC);
	$rs = $query->fetchAll();	
	

根据IP得出所在地信息


	include './IP.class.php';//ipip.net提供的IP处理类
	$IP = new IP();
	foreach ($rs as $key => $value) {
		$a = $IP->find($value["ips"]);
		$arr[$key] = $a[1].$a[2]; //正确的IP返回的数据为:Array ( [0] => 中国 [1] => 黑龙江 [2] => 鹤岗 [3] => ) ,这里根据实际情况取对应字段即可。
	}
	

生成echarts所需的数据

	$data = array_count_values($arr);

嗯,一个非常简单粗暴的把同名地区数量算出来的方法。这样就生成了如array(‘北京’=>2,‘天津’=>3)这样的数据。正是echarts所需的。

饼图区域

	<div id="main" style="width: 1400px;height:1400px;"></div>

正经的JS来了

	var myChart = echarts.init(document.getElementById('main'));
	
	var option = {
       
        title : {
	        text: 'IP分布地区',
	        x:'center'
	    },
	    tooltip : {
	        trigger: 'item',
	        formatter: "{a} <br/>{b} : {c} ({d}%)"
	    },
	    legend: {
	        orient: 'vertical',
	        left: 'left',
	        //简单起见此处直接foreach了
	        data: [<?php foreach ($keys as $key => $value) { echo '"'.$value.'",'; }?>]
	    },
	    series : [
	        {
	            name: 'IP所在地区',
	            type: 'pie',
	            radius : '40%',
	            center: ['50%', '60%'],
	            data:[
	            	//同上,
	            	<?php 
	            		//reset($data);
						while (list($key, $val) = each($data)){
							echo '{value:'.$val.',name:"'.$key.'"},';
						}
	            	?>
	            ],
	            itemStyle: {
	                emphasis: {
	                    shadowBlur: 10,
	                    shadowOffsetX: 0,
	                    shadowColor: 'rgba(0, 0, 0, 0.5)'
	                }
	            }
	        }
	    ]
    };
    
    // 使用刚指定的配置项和数据显示图表。
    myChart.setOption(option);

打开页面看一下吧。

一个简单的input获得焦点时的小动画效果

以前经常见到的一种页面表单效果,但是并没有在项目中用过,前几天在sf上看到相关问题,顺手写了一个,还挺简单,没什么含量。

此处省略HTML头信息等内容。

HTML

	<div>
	  <input type="text" id="uname"/>
	  <label>Name</label>
	</div>

CSS

div{
  position:relative;
  top:20px;
}
input{
  width:200px;
  height:30px;
  line-height:30px;
  padding:0 3px;
  border:none;
  border-bottom:1px solod #666;

}
label{
  position:absolute;
  top:0;
  left:5px;
  color:#ccc;
  height:30px;
  line-height:30px;
}

JS

$(document).on('focus','input',function(){
  $(this).siblings('label').animate({top:'-20px'});
})
.on('focusout','input',function(){
  $(this).siblings('label').animate({top:'0'});
})

RESULT

不足

没有考虑label的鼠标点击事件。

crontab实现定时备份数据库

crontab命令之前写过了,在Linux crontab 访问PHP URL完成定时任务,今天写了一个用来备份数据库的脚本。

主要会用到以下几个命令:

mysqldump

参考文章:mysqldump导入导出数据库总结

创建.sh文件:

cd ~
vi backup.sh

backup.sh内主要内容如下:

mysqldump -hlocalhost -uroot -p'root' --databases database1 | gzip > /var/backups/databases-database1`date +'%Y%m%d_%H%M%S'`.sql.gz

首先用mysqldump命令

	1.连接数据库
	2.选择要备份数据库
	3.选择存储备份文件的方式,这里使用了gzip了生成一个压缩包

根据文档,如果想备份所有数据库,可以使用

mysqldump -hlocalhost -uroot -p'root' --all-databases | gzip > /var/backups/databases-all-database`date +'%Y%m%d_%H%M%S'`.sql.gz

保存。

有备份,就会有备份后的处理,显而易见的问题是备份多了会比较占空间,并且也用不到那么多备份。所以备份完成删除掉一段时间以前的就可以了。这步也可以在备份前做,无所谓。

这里又用到了find命令

参考文章:Linux中find常见用法示例

删除之前的备份文件

在刚才的backup.sh中继续输入:

cd /var/backups/
rm -rf `find . -name '*.sql.gz' -mtime +10`

这句命令有两部分,

第一部分是删除命令:’rm -rf’。就是那句一定要慎用的命令了.

第二部分是找到:当前目录,名字以’.sql.gz’结尾的,更改时间在10天以前的文件。

’.‘表示当前目录,由于上一句是’cd /var/backups/‘,所以这里使用当前目录即可。

’-name ‘和’-mtime’参数是find命令的条件。

具体的说明,和其他条件可以参考前边说到的文章。

到这里基本上备份脚本就完成了。

但是为了什么时候突然想看一下日志,或者备份出错的时候查问题,还可以在脚本里加上记录日志的命令:

日志

echo 'Begin Backup Database At :' `date +'%Y-%m-%d %H:%M:%S'`

这里又用到了date。

参考文章:Linux下date命令,格式化输出,时间设置

脚本保存之后,记得添加执行权限;

sudo chmod +x backup.sh

接下来就是在系统里添加crontab任务了。

cd /etc/
vi crontab

在文件末尾,加上

m  h dom mon dow user	command
00 5 * * * root /home/yourname/backup.sh >> /var/log/backup.log

这样,backup.sh里的echo就会输出到/var/log/backup.log中了。

Over。