Mac brew php7.1环境下安装Yaf

开发机一直使用brew来安装PHP及其他的环境,今天把PHP升到7.1,由于7.1版本下还没有yaf的源,所以无法使用brew安装,只能编译安装了。

首先下载yaf,解压,进入目录。

git clone git@github.com:laruence/yaf.git

$(brew --prefix homebrew/php/php71)/bin/phpize

./configure --with-php-config=$(brew --prefix homebrew/php/php71)/bin/php-config

make && make install

make test

$(brew –prefix homebrew/php/php71) 即 brew info php71结果中的path值。

由于brew安装PHP会在php.ini同级目录创建conf.d目录,并把扩展的配置文件写在这里,一目了然知道都安装了哪些扩展,所以也以同样方式在此目录创建ext-yaf.ini。

make install 后会显示,具体路径可能会不一样。

Installing shared extensions:     /usr/local/Cellar/php71/7.1.0-rc.5_9/lib/php/extensions/no-debug-non-zts-20160303/

这个目录即扩展.so的存放目录。下边会用到。

[yaf]
extension="/usr/local/opt/php71/lib/php/extensions/no-debug-non-zts-20160303/yaf.so"
yaf.environ="dev"
;yaf.use_namespace = 1

至此,重启php-fpm就可以了。

图片来自:Emergence of PHP7

用Docker部署Golang Beego框架应用

Docker是什么就不说了。 Golang是什么也不说了。 Beego是什么就更不用说了。

最近Beego项目完成,研究怎么部署。因为Docker部署起来更简单更快速,所以就说下怎么在docker里部署beego应用。

写在前面

假设你的应用路径为 /go/app; 假设已配置好docker的相关东西。 假设使用 godep 作为依赖管理工具 示例中开放端口为80,需要与app.conf中的端口一致,可以自行修改。

配置

在 /go/app 目录新建Dockerfile。

FROM golang:1.7.1-alpine

MAINTAINER youremail <youremail@xxx.com>

RUN apk add --update go git

ADD ./ /go/src/app

RUN cd /go/src/app \
	&& go get github.com/astaxie/beego \
	&& go get github.com/tools/godep \
	&& godep update -goversion \
	&& godep get \
	&& godep save \
	&& go build

EXPOSE 80

EXTRYPOINT /go/src/app/app

本例使用 golang:1.7.1-alpine 作为基础镜像。golang的所有镜像见这里

构建

docker build -t app .

运行

docker run -d -p 8080:80 app

访问

使用nginx反向代理访问docker中的go应用。

server {
    listen       80;
    server_name  app.com;

    charset utf-8;
    access_log  logs/app.access.log;

    location / {
        try_files /_not_exists_ @backend;
    }
    if (!-e $request_filename) {
        return 404;
    }

    location @backend {
        proxy_set_header X-Forwarded-For $remote_addr;
        proxy_set_header Host            $http_host;

        proxy_pass http://192.168.99.100:8080; // 192.168.99.100为docker machine的ip,8080为 docker run 时指定的本地端口。
    }
}

相关资料

如何使用Docker快速部署go-web应用程序 Deploying Go servers with Docker 如何使用Docker部署Go Web应用程序 The Easiest Way to Develop with Go — Introducing a Docker Based Go Tool How To Deploy a Go Web Application with Docker nginx 部署

Laravel 手动创建分页

有些情况下会从接口读取数据,数据较多时会用到分页,Laravel为这种需求提供了很方便的方法。

官方文档里几句略过,并没有详细说明,经过查找资料,发现如下方法可行。

首先use LengthAwarePaginator

use Illuminate\Pagination\LengthAwarePaginator;
use Illuminate\Support\Collection;

假设原内容是:

$result = [
    'item1',
    'item2',
    'item3',
    'item4',
    'item5',
    'item6',
];

对于一个列表来说,item一般会是个array,这里忽略。

情况1,已知总数,只有部分数据

由于本人所使用的接口有页码和每页数量的参数,所以每次查询返回的其实就是每一页的内容了,而接口又返回了符合条件的总数count,所以使用如下方式即可:

$perPage = 10;
$count = 100;//假设这里是接口返回的总数
//创建collection
$collection = new Collection($data);

$currentPageResults = $collection->all();

//生成分页
$data = new LengthAwarePaginator($currentPageResults, $count, $perPage);
//设置分页的链接
$data->setPath($request->url());

情况2,未知总数,有全部数据

而如果$data是全部数据呢,比如100条数据全部返回,然后要生成一个每页10条记录的分页,可以这样做:

//获取当前页码
$currentPage = LengthAwarePaginator::resolveCurrentPage();

//从数组创建一个laravel collection
$collection = new Collection($searchResults);

//设置每页数量
$perPage = 10;

//从collection分割数据
$currentPageSearchResults = $collection->slice($currentPage * $perPage, $perPage)->all();

//生成分页
$paginatedSearchResults= new LengthAwarePaginator($currentPageSearchResults, count($collection), $perPage);
//设置分页的链接
$data->setPath($request->url());

区别

其实两者只是相差了一次分割数据。

最后

在视图里依然使用

{!! $data->render() !!}

输出分页组件。

看起来还挺简单的。

参考链接:

官方文档pagination

Custom data pagination with Laravel 5

CUSTOM PAGINATION VIEW IN LARAVEL 5 WITH ARRAYS

Golang XMl 转 JSON

起因

某个上古时代的API,依然在返回XML格式的数据,更奇葩的是,GBK格式的。

用Go顺利的写到了发送数据,接收数据,然后取值有点麻烦啊。。。。

各种Google后,终于解决,但是不保证是唯一,正确,最合适的答案。

说在前边

本文假设要解析的XMl数据为:

<?xml version="1.0" encoding="GBK" ?>
<response>
    <status>200</status>
</response>

要解决的问题是取出“200”这个状态值。

导入包

解析XML使用了”encoding/xml”这个包。

所以先导入这个包。

import "encoding/xml"

定义struct

定义一个自定义类型的Response

type Response struct {
    Status int `xml:"status" json:"status"`
}

定义一个Response类型的变量

var result Response

偷懒转格式

因为”encoding/xml”不支持GBK格式的XML,而返回的内容又固定标明了编码是GBK,所以这里偷懒,直接把GBK替换成UTF-8,本例中不影响结果。

xmlstr := `?xml version="1.0" encoding="GBK" ?>
<response>
    <status>200</status>
</response>
`
xmlstr = strings.Replace(xmlstr, "GBK", "UTF-8", -1)

使用strings包,替换“GBK”,相信根据参数顺序能看出各个参数的意义,最后一个参数:-1,为替换全部,即字符串中所有出现的第二个参数全部替换。

解析,转换,取值

使用encoding/jon,go-simplejson包

//解析XML
err := xml.Unmarshal([]byte(xmlstr), &result)

if nil != err {
  log.Fatal(err)
}
log.Printf("XML:%v \n", result) 

//转换成JSON
res, err := json.Marshal(result)

if nil != err {
  log.Fatal(err)
}
log.Printf("JSON:%s \n", res)

js, err := simplejson.NewJson([]byte(res))

if nil != err {
  log.Fatal(err)
}
status, err := js.Get("status").Int()

log.Printf("STATUS:%v \n", status)

以上是本人在处理XML 转 JSON 时的解决办法,应该还有更简单更合适的方案。仅供参考。

完整代码:

package main
import (
    "encoding/xml"
    "encoding/json"
    "log"
    "strings"
    simplejson "github.com/bitly/go-simplejson"
)
type Response struct {
    Status int `xml:"status" json:"status"`
}

func main() {
    
    var result Response

    //多行字符串,使用反引号`
    xmlstr := `<?xml version="1.0" encoding="GBK" ?>
<response>
    <status>200</status>
</response>`

    xmlstr = strings.Replace(xmlstr, "GBK", "UTF-8", -1)

    err := xml.Unmarshal([]byte(xmlstr), &result)
    if err != nil {
        log.Fatal(err)
    }
    log.Printf("XML:%v",result)

    r, err := json.Marshal(result)
    if nil != err {
        log.Fatal(err)
    }

    log.Printf("JSON:%s", r)

    js, err := simplejson.NewJson([]byte(r))

    if nil != err {
        log.Fatal(err)
    }
    status, err := js.Get("status").Int()
    log.Printf("VALUE:%v",status)

}

发现问题:

今天(2016-09-18),再看这段代码,发现跟另一个程序里有些不一样。

另一个程序里是这样的:

type Response struct {
    Status int `xml:"status"
}

也可以正常返回值,但是在本文中的示例却不能正常输出status值,而是会输出空,看了半天发现,使用 log 时:

log.Printf("VALUE:%v",status)

如果struct没有写 “json:“status””,就不能输出,如果换成fmt,struct就可以不写“json:“status”。结果是一样的,其中的原因还要再查资料研究下。

参考文章:

标准库—XML处理(一)

https://play.golang.org/p/7HNLEUnX-m

https://play.golang.org/p/m99B12RaLe

XML处理

Hello Go

Mac 安装 GO

本人使用了Brew来安装。

安装前首先更新Brew。
brew update && brew upgrade
安装Go

使用brew安装go

brew install go
设置$GOPATH

Go从1.1版本开始必须设置这个变量,也就是说通过以上方式安装后就必须要设置$GOPATH了。

这个目录用来存放Go源码,可运行文件,和编译之后的包文件。

Mac系统设置

在bash中加入:

export GOPATH=/your/path

以上目录需要存在,不存在就自己手动创建一个。

然后运行

source ~/.zshrc

替换成你自己使用的shell,如.bashrc等。

然后,需要在/your/path目录下,新建 pkg,bin,src目录。

​ src目录存放源代码,如hello.go

​ pkg目录存放编译后的文件,如hello.a

​ bin目录存放生成后的可执行文件

src目录就是主要开发目录。如hello项目,则在src下新建hello目录。

到这里,就可以开始Go之旅了。

Hello Go

需要注意的是,在Go中,包名和文件名是可以不同的,包名为main的时候即为可以独立运行的包。

在src目录新建hello目录,创建hello.go文件。

package main //包名

import "fmt" //导入一个系统级别的fmt包

//使用func定义函数
//main函数没有参数,没有返回值
func main() {
  fmt.Println("Hello Go") //调用包函数的方法为 pkgName.funcName
}

一个hello.go就写好了,在命令行输入

go run hello.go

就可以看到输出Hello Go了。

PHP Curl 上传文件

有时候会遇到上传文件给第三方服务的情况,比如本身程序并不需要存储附件,而是把附件发送给一个公共的服务。

最近正好碰到这个问题,记录一下。

上代码。

发送端:

<?php

// 接口地址
$api = 'http://api.example.com/uploadfile';
$file = $_FILES['file'];//保存$_FILES到变量中。

// 此处可能存在上传失败等问题,需验证$_FILES["file"]["error"]。
// 做业务对应的规则验证,如文件格式,文件大小等。

// 创建一个 cURL 句柄
$ch = curl_init($api);

// 创建一个 CURLFile 对象
// 上传文件的路径,文件的Mimetype,文件名
$cfile = curl_file_create($file['tmp_name'],$file['type'],$file['name']);

$data = [
	'type'=>'image',
	'data'=>$cfile
];

// 设置 POST 数据
curl_setopt($ch, CURLOPT_POST,1);
curl_setopt($ch, CURLOPT_POSTFIELDS, $data);

$resp = curl_exec($ch);

if(!$resp) {
  die('Error: "' . curl_error($ch) . '" - Code: ' . curl_errno($ch));
} else {
  echo "Response HTTP Status Code : " . curl_getinfo($ch, CURLINFO_HTTP_CODE);
  echo "\nResponse HTTP Body : " . $resp;
}

// Close request to clear up some resources
curl_close($ch);

接收端:

<?php 
var_dump($_FILES);// 接收文件内容
va_dump($_POST);// 接收type

前端:

前端可使用Form或其他Ajax方式上传。

一个简单的php+redis队列示例

虽然RabbitMQ的坑年前就开始填了,但是并没有机会在项目中实际使用,机缘巧合,换工作后第一个比较重要的事就是做一个直播的页面,如果数据直接插入或读取自数据库,数据库端的压力就太大了,当时RabbitMQ还没看完,而其他的队列程序更是没有用过,只是稍微对Redis熟悉些,于是就使用了Redis做。

Redis比较常见的是作为数据缓存工具使用,数据存储在内存中,减少了数据库的连接和查询,效率高,又方便。

而其实Redis也可以用来做消息队列。

发送

发送端首先当然是做好数据的接收和检测,比如为空的情况下给个默认值或返回错误。

特殊的情况下可能还要正则处理。

处理完插入到Redis的list(列表)中。如果插入失败抛出异常。

<?php

$redis = new Redis();
$redis->connect('127.0.0.1');

//如果redis需要认证
$redis->auth('redispasswd');

//命令行中使用,可以使用把信息作为参数的方式
$data = $argv[1];
if(empty($data)) $data = "Hello World..!":

try{
	//便于调试,在信息后边加上当前时间
	$data .= '-at-'.date('Y-m-d H:i:s');
	$redis->LPUSH('redis_queue_1',json_encode($data));
}catch(Exception $e){
	echo $e->getMessage()."\n";
}

执行

php push.php "testinfo"

接收

由于消息是源源不断发送到队列中,所以接收端程序一般会以后台进程的方式运行。

<?php
$redis = new Redis();
$redis->connect('127.0.0.1');

$redis->auth('redispasswd');

while (true) {

	try {
		$data = $redis->rPopLPush('redis_queue_1','redis_queue_1_bak')."\n";
		if($data){
			echo $data;
		}else{
			echo "没有数据\n";
		}
	} catch (Exception $e) {
		echo $e->getMessage()."\n";
	}

	//每秒读取1次
	sleep(1);
}

执行

php get.php

更进一步

如果想把数据写入到MySQL中,该怎么办?

大多数情况下数据最终是需要持久化的,仅仅存在于redis中,也许内存会不够呢。

改造一下get.php。

<?php
$redis = new Redis();
$redis->connect('127.0.0.1');

$redis->auth('redispasswd');

$dsn = 'mysql:dbname=test;host=127.0.0.1';

try{
	$pdo = new PDO($dsn,'root','mysqlpasswd',array(PDO::MYSQL_ATTR_INIT_COMMAND => 'SET NAMES utf8'));
}catch(PDOExcetion $e){
	echo $e->getMessage()."\n";
}

while(true){
	$data = $redis->rPopLPush('redis_queue_1','redis_queue_1_bak');
	if($data){
		$sth = $pdo->prepare("INSERT INTO tb_test(msg,time) VALUES (:msg,:time)");
		
		$time = time();
		
		$sth->bindParam(':msg',$data,PDO::PARAM_STR);
		$sth->bindParam(':time',$time,PDO::PARAM_INT);
		
		$sth->execute();
		$id = $pdo->lastInsertId();
      
		echo "插入成功,ID=$id \n";
	}else{
		echo "没有数据 \n";
	}
	
	//实际任务中可能需要尽可能快的把数据导入到MySQL中
	sleep(1);
}

注意

​ 这里使用了PDO;

bindParam方法的第一个参数对应insert语句中的对应顺序的值,如第一个bindParam方法对应第一个value值。

​ bindParam方法第二个参数为实际的值,传入变量或固定值,传入方法调用时如time()会提示:

Strict standards: Only variables should be passed by reference in /www/get.php on line 24。

解决Discuz无法发布爱奇艺视频的问题

最近碰到需要在Discuz论坛中插入爱奇艺视频的问题,之前没关注过,搜索后有些答案说DZ不支持爱奇艺,有些说爱奇艺不支持DZ,并没有真正能解决问题的。

下午突然想到也许是DZ根据粘贴进来的flash地址生成的标签代码不对,试验后发现果然是这个原因。

打卡/static/js/editor.js 文件第1299行查看这段代码:

case 'vid':
	var mediaUrl = $(ctrlid + '_param_1').value;
	var auto = '';
	var posque = mediaUrl.lastIndexOf('?');
	posque = posque === -1 ? mb_strlen(mediaUrl) : posque;
	var ext = mediaUrl.lastIndexOf('.') === -1 ? '' : mediaUrl.substring(mediaUrl.lastIndexOf('.') + 1, posque).toLowerCase();
	ext = in_array(ext, ['mp3', 'wma', 'ra', 'rm', 'ram', 'mid', 'asx', 'wmv', 'avi', 'mpg', 'mpeg', 'rmvb', 'asf', 'mov', 'flv', 'swf']) ? ext : 'x';
	if(ext == 'x') {
		if(/^mms:\/\//.test(mediaUrl)) {
			ext = 'mms';
		} else if(/^(rtsp|pnm):\/\//.test(mediaUrl)) {
			ext = 'rtsp';
		}
	}
	var str = '[media=' + ext + ',' + $(ctrlid + '_param_2').value + ',' + $(ctrlid + '_param_3').value + ']' + squarestrip(mediaUrl) + '[/media]';
	insertText(str, str.length, 0, false, sel);
	break;

Discuz根据主流的视频网站的视频地址格式写的规则,生成discuz专用的[media]标签,在前台输出的时候再解析成embed这样的HTML代码。

解析之后的差不多就是这样:

<embed src="http://player.video.qiyi.com/7b42a1a27ff121c201ee5e6c6d757817/0/0/v_19rrklq2bs.swf-albumId=406283300-tvId=406283300-isPurchase=2-cnId=1" allowFullScreen="true" quality="high" width="480" height="350" align="middle" allowScriptAccess="always" type="application/x-shockwave-flash"></embed>

而不那么主流的爱奇艺的flash地址则是:”http://player.video.qiyi.com/7b42a1a27ff121c201ee5e6c6d757817/0/0/v_19rrklq2bs.swf-albumId=406283300-tvId=406283300-isPurchase=2-cnId=1”,查看上一段代码可以看到,discuz会用正则去看粘贴的地址最后一个”.“后的后缀,如果这个后缀不在自己的已知flash格式数组中,就把类型设为”x”,也就是生成的标签成了[media=x,500,350]。

再到了前台解析,不认识,直接生成一个a标签。

所以,我的解决办法是:

if(ext == 'x') {
	if(/^mms:\/\//.test(mediaUrl)) {
		ext = 'mms';
	} else if(/^(rtsp|pnm):\/\//.test(mediaUrl)) {
		ext = 'rtsp';
	} else if (mediaUrl.indexOf('player.video.qiyi.com')) {
  		ext = 'swf';	
	}
}

增加一个else if 判断是否包含爱奇艺播放器的域名,当用户粘贴的flash地址包含这个字符串时,就认为是粘贴了一个爱奇艺的视频,存储的格式也就成了[media=swf]。

但是这里其实不是十分严谨,如果一个非法的flash地址很巧合的包含了这个字符串,也会认为是flash了,那这种情况下就会出错了。

PHP RabbitMQ 教程(六) - 远程调用

远程调用

第二节中,我们学习了如何使用工作队列在多个worker中分发耗时任务。

但是如果我们需要在远程运行一个函数并等待返回结果怎么办?这是两码事,这个模式通常被称为远程过程调用(Remote Procedure Call,RPC)。

在本节中我们准备使用RabbitMQ构建一个RPC系统:一个客户端和一个可扩展的RPC服务端。由于我们没有值得分发的耗时任务,我们准备创建一个假的返回斐波那契数列的RPC服务。

客户端接口

我们创建一个简单的客户端类来说明RPC服务如何使用。这个类会展示call方法如何发送一个RPC请求并且阻塞,直到接收到返回值。

$fibonacci_rpc = new FibonacciRpcClient();
$response = $fibonacci_rpc->call(30);
echo "[.] Got ", $response, "\n";

RPC注意事项

尽管RPC在计算机学中很常见,但它十分挑剔。当程序员不知道是否是调用一个本地的方法还是一个很慢的RPC会出现这个问题。这样的困惑便导致不可预测的系统并增加不必要的调试复杂性。比起简化的软件,误用RPC会导致不可维护的无头绪代码。

记住刚才的内容,考虑下面的建议:

​ 确保可以明显的看出哪个方法调用的是本地的哪个是远程的。

​ 系统文档化。让组件之间的依赖变得清晰可见。

​ 错误处理。当RPC服务长时间关闭客户端该作何反应?

如果有疑问,则尽量避免使用RPC。如果可以话,你应该使用异步管道——而不是RPC——像阻塞,结果被异步推送到下个计算阶段。

回调队列

通常在RabbitMQ上做RPC很简单。客户端发送请求消息,服务端回复消息。为了接收响应消息,我们需要在请求中附带一个”callback”队列地址,我们可以使用默认的队列。来试一试:

list($queue_name, ,) = $channel->queue_declare("", false, false, true, false);

$msg = new AMQPMessage(
	$payload,
	array('reply_to' => $queue_name));

$channel->basic_publish($msg, '', 'rpc_queue');

# ... then code to read a response message from the callback_queue

消息属性

AMQP协议定义了14个消息属性。大部分不常用,下面的除外:

​ delivery_mode:值为2时表示持久化,1为临时的。也许你还记得这个属性来自第二节。

​ content_type:编码格式,比如经常用的JSON格式,良好的做法是设置为:application/json。

​ reply_to:通常用来定义回调队列名称。

​ correlation_id:用来关联RPC的响应和请求。

Correlation Id

在上面的方法中我们建议为每一个RPC请求创建一个回调队列。这样非常低效,但是幸运的是有更好的办法 - 我们可以为每一个客户端创建一个单独的回调队列。

这样又带来一个新的问题,在队列接收到响应时,并不知道属于哪个请求。这也正是correlation_id属性要发挥的作用。我们为每一个请求的设定一个唯一的correlation_id值,然后,当在回调队列接收到消息时会查看它的属性,基于此,我们就可以把响应和请求进行匹配。如果发现一个未知的correlation_id值,可以安全的忽略掉这条消息 - 因为它不属于任何请求。

也许你会问,为什么应该忽略回调队列里的未知消息,而不是返回一个错误?因为服务可能会出现紊乱的情况,虽然不太可能,但是如果发生这种情况,RPC服务会在发送完响应后挂掉,但是还没有进行消息确认。如果发生了,重启RPC服务后会再次处理这个请求。这就是为什么在客户端我们必须适当的处理重复请求,而RPC服务最好的幂等的。

总结

RPC工作流程:

​ 当客户端开始运行时会创建一个匿名独有回调队列。

​ RPC请求中,客户端消息带有两个属性:reply_to用来设置回调队列,correlation_id用来唯一标识每一个请求。

​ 请求被发送到rpc_queue队列。

​ RPC worker(又称worker)在队列中守护,等待新请求。当请求到达,它会进行处理,然后把结果以消息的形式发送回客户端的队列,队列名便是客户端消息带有的reply_to的值。

​ 客户端等待回调队列中的数据。当消息到达,检查它的correlation_id的值。如果符合客户端发送给RPC服务器中请求的值,客户端会返回响应内容到应用中。

整合

斐波那契方法:

function fib($n){
  if($n == 0)
  	return 0;
  if($n == 1)
  	return 1;
  return fib($n-1) + fib($n-2);
}

定义完斐波那契方法。假定它仅接受数字类型的输入。(别期望它能处理大的数字,它很可能非常慢的处理完。)

RPC服务处理程序rpc_server.php:

<?php

require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();

$channel->queue_declare('rpc_queue', false, false, false, false);

fuction fib($n){
  if($n == 0)
    return 0;
  if($n == 1)
    return 1;
  
  return fib($n-1) + fib($n-2);
}

echo " [x] Awaiting RPC requests\n";

$callback = function($req){
  $n = intval($req->body);
  echo " [.] fib(",$n,")\n";
  
  $msg = new AMQPMessage(
  	(string)fib($n),
    array('correlation_id' => $req->get('correlation_id'))
  );
  
  $req->delivery_info['channel']->basic_publish($msg, '', $req->get('reply_to'));
  $req->delivery_info['channel']->basic_ack($req->delivery_info['delivery_tag']);
};

$channel->basic_qos(null, 1, null);
$channel->basic_consume('rpc_queue', '', false, false, false, false, $callback);

while(count($channel->callbacks)){
  $channel->wait();
}

$channel->close();
$connection->close();

?>

服务端代码相当简单:

​ 和以往一样我们会从创建连接,频道,和声明队列开始

​ 也许我们想要运行更多的进程。为了在多个服务器之间负载均衡,需要在$channel.basic_qos中设置prefech_count;

​ 我们使用basic_consume访问队列。然后进入while循环等待请求消息,处理,然后返回响应消息。

RPC客户端 rpc_client.php:

<?php

require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

class FibonacciRpcClient{
	private $connection;
	private $channel;
	private $callback_queue;
	private $response;
	private $corr_id;
  
	public function __construct(){
		$this->connection = new AMQPStreamConnection(
		  'localhost', 5672, 'guest', 'guest'
		);
		$this->channel = $this->connection->channel();
		list($this->callback_queue, ,) = $this->channel->queue_declare("", false, false, true, false);
		$this->channel->basic_consume(
			$this->callback_queue, '', false, false, false, false,
			array($this, 'on_response'));
	}
  
 	public function on_response($req){
 		if($req->get('correlation_id') == $this->corr_id){
 			$this->response = $req->body;
 		}
 	}
  	
	public function call($n){
		$this->response = null;
		$this->corr_id = uniqid();

		$msg = new AMQPMessage(
  		(string) $n,
  		array(
  			'correlation_id' => $this->corr_id,
  			'reply_to' => $this->callback_queue)
  		);
    )
      
    $this->channel->basic_publish($msg, '', 'rpc_queue');
    while(!$this->response){
    	$this->channel->wait();
    }
    return intval($this->response);
  }
};

$fibonacci_rpc = new FibonacciRpcClient();
$response = $fibonacci_rpc->call(30);
echo " [.] Got ", $response, "\n";

?>

现在可以查看示例的完整代码了。rpc_client.phprpc_server.php

现在RPC服务端可以运行了:

php rpc_server.php
[x] Awiting RPC resquests

接收斐波那契数列运行:

php rpc_client.php
[x] Requesting fib(30)

这里展现的并不是RPC服务的唯一可能实现,但它有一些重要的优势:

​ 如果RPC服务太慢,可以按比例增加运行数量。试试在新控制台裕兴第二个rpc_server.php服务。

​ 在客户端,RPC要求只发送和接收一条消息。不能有像队列声明一样的异步调用。结果就是,对于单一的RPC请求,客户端仅需要一个网络往返。

现在的代码还是过于简单,并没有想解决更复杂(更重要)的问题,比如:

​ 要是没有服务端守护运行,客户端作何反应?

​ RPC客户端是否需要设置超时?

​ 如果服务端引发异常,是否该把它发送到客户端?

​ 处理前阻止无效消息(如检查范围,类型)进入?

如果想尝试,可以在rabbitmq-management 里找到一些有用的查看队列的插件。

原文地址:Remote procedure call (RPC)

PHP RabbitMQ 教程(五) - 主题

主题

(使用php-amqplib)

上一节我们改善了日志系统(logging system,以下简称日志系统),为了替代fanout类型的交换器,我们使用了一个direct类型的交换器,带来的好处是可以有选择的接收日志。

虽然使用direct交换器改善了系统,但是仍然有局限性 - 它不能根据多个条件进行路由。

在日志系统中,我们也许不仅仅想订阅严重等级的日志,也想订阅基于消息发布源的内容。也许你已经知道这个概念来自于UNIX的syslog工具,基于严重性(info/warn/crit…)和设备路由日志(auth/cron/kern…)的工具。

这可以提高灵活性 - 我们也许只想要监听来自’cron’的关键错误而不是来自’kern’的全部日志。

为了在我们的日志系统上实现这个功能,需要学习一个更复杂的 topic 交换器。

Topic 交换器

发送到 topic 交换器的消息不能随意设置 routing_key - 它必须是一个单词列表,以’.‘分隔。单词可以是任何内容,但是通常会具体说明消息的功能。一些有效的routing key示例:样:”stock.usd.nyse”,”nyse.vmw”,”quick.orange.rabbit”。routing key 可以是任何长度的你喜欢的单词,最大255个字节。

binding key 也必须是同样的格式,topic交换器的逻辑和direct交换器类似 - 带有特定routing key的消息会被派发到所有绑定了binding key的队列,然而对于binding key依然有两个重要的特殊情况:

*可以代替一个单词

#可以代替0个或多个单词

下图比较好的解释了这个情况:

在这个例子中,我们准备全部发送描述动物的消息。这些消息带有由三个单词(两个点号分隔)组成的routing key,其中第一个单词表示速度,第二个表示颜色,第三个表示种类:”..“。

我们创建三个绑定:Q1的binding key为”*.orange”,Q2的binding key为”*.*.rabbit”和”lazy.#“。

这些绑定可以概括为:

Q1 关注所有orange的动物

Q2 想知道所有关于兔子(rabbits)和懒惰动物(lazy animals)的消息。

routing key为”quick.orange.rabbit”的消息会被发送到两个队列,”lazy.orange.elephant”也会被发送到这两个队列。而”quick.orange.fox”则只会发送到第一个队列,”lazy.brown.fox”会被只发送到第二个队列。”lazy.pink.rabbit”会只被发送到第二个队列一次,即使它匹配两个绑定。”quick.brown.fox”不匹配任何绑定,所以会被丢弃。

如果打破规则,发送一条带有一个或四个单词,如”orange”或”quick.orange.male.rabbit”会怎么样?好吧,消息会丢失,因为它不匹配任何一个绑定。

但是,”lazy.orange.male.rabbit”这种消息,即使它有4个单词,依然会匹配最后一个绑定,然后被发送到第二个队列。

topic 交换器

topic 交换器非常强大,可以表现得跟其他交换器一样。

当一个队列的binding key为"#"时,它会接收所有消息,忽略routing key,像fanout交换器一样。

当绑定中不存在"*"和"#"时,topic交换器会表现的跟direct交换器一样。

整合

我们准备在日志系统中使用topic交换器。假定日志的routing key由两个单词:”.“组成。

代码与上一节的几乎一致。

emit_log_topic.php:

<?php

require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();

$channel->exchange_declare('topic_logs', 'topic', false, false, false);

$routing_key = isset($argv[1]) && !empty($argv[1]) ? $argv[1] : 'anonymous.info';
$data = implode(' ', array_slice($agrv, 2));
if(empty($data)) $data = "Hello Wrold!";

$msg = new AMQPMessage($data);

$channel->basic_publish($msg, 'topic_logs', $routing_key);

echo " [x] Sent ",$routing_key,':',$data," \n";

$channel->close();
$connection->close();

?>

receive_logs_topic.php:

<?php

require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;

$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();

$channel->exchange_declare('topic_logs', 'topic', false, false, false);

list($queue_name, ,) = $channel->queue_declare("", false, false, true, false);

$binding_keys = array_slice($argv, 1);
if( empty($binding_keys) ){
  file_put_contents('php://stderr', "Usage: $argv[0] [binding_key]\n");
  exit(1);
}

foreach($binding_keys as $binding_key){
  $channel->queue_bind($queue_name, 'topic_logs', $binding_key);
}

echo ' [*] Waiting for logs. To exit press CTRL+C', "\n";

$callback = function($msg){
  echo ' [*] ',$msg->delivery_info['routing_key'], ':', $msg->body, "\n";
}

$channel->basic_consume($queue_name, '', false, true, false, false, $callback);

while(count($channel->callbacks)){
  $channel->wait();
}

$channel->close();
$connection->close();

?>

接收所有日志:

php receive_logs_topic.php '#'

接收所有来自”kern”的日志:

php receive_logs_topic.php "kern.*"

只接收”致命(critical)“日志:

php receive_logs_topic.php "*.critical"

创建多个绑定:

php receive_logs_topic.php "kern.*" "*.critical"

发布routing key为”kern.critical”的日志就输入:

php emit_log_topic.php "kern.critical" "A critical kernel error"

注意此代码并没有做路由或捆绑的例子,也许你想试一下两个以上的routing key参数。

一些问题:

”*“会匹配routing key为空的消息吗?

”#.*“会匹配内容为”..“的消息吗?会匹配一个单词的消息吗?

“a.*.#“和”a.#“的区别是什么?

emit_log_topic.php完整代码 receive_logs_topic.php完整代码

下一步,在第六节中学习像远程过程调用一样完成消息往返。

原文地址:Topics