博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
How it works(2) autocannon源码阅读(A)
阅读量:4302 次
发布时间:2019-05-27

本文共 9781 字,大约阅读时间需要 32 分钟。

autocannon是纯node实现的接口压力测试工具,市面上类似的产品很多,老牌的AB,带有图形界面的soap ui等.不过autocannon可以方便的进行命令行调用,甚至在代码内调用,这对于nodejs项目的单元测试来说是相当方便的.

下面就来简单分析一下他的源码.

综述

上一篇我对winston框架做了源码分析,其核心关键词是流,通过流将模块链接起来.

而对于autocannon,关键词是事件,无数的事件传递是模块之间产生关系的桥梁.

代码架构

用生成的结构图.

autocannon.js

上面说到,autocannon可以方便的进行命令行调用.全局安装时,它就是一个命令行工具,因此需要接收运行参数.

引用了minimist这个库使得它能像其他命令行工具一样,既能接收完整参数,也可以接收别名参数.

nodejs同python一样,文件既可以是执行的主体,也可以是供其他应用调用的模块.这里使用了一个官方文档的小技巧,区分autocannon到底是被当做命令行工具还是被当做模块来调用.

When a file is run directly from Node, require.main is set to its module.

if (require.main === module) {
const argv = crossArgv(process.argv.slice(2)) start(parseArguments(argv))}

作为程序的入口,autocannon.js一半的篇幅用于处理运行参数的初始化与规范化.

保证输入的参数的规范后,便开始按参数运行整个程序.
运行程序的方法是很简单的:

const run = require('./lib/run')const track = require('./lib/progressTracker')...function runTracker (argv, ondone) {
//按参数执行压测 const tracker = run(argv) tracker.on('done', (result) => {
if (ondone) ondone() if (argv.json) {
console.log(JSON.stringify(result)) } }) tracker.on('error', (err) => {
if (err) {
throw err } }) //绑定到实时进度显示 if (!argv.json || !process.stdout.isTTY) track(tracker, argv) process.once('SIGINT', () => {
tracker.stop() })}

不过,在运行前,autocannon还对一种特殊参数做了处理.当参数里包含’–on-port’项时,可以追加一个参数’–’.

'–'后面跟随的命令,会在监听端口时执行.这个方法一般用于单元测试.

if (argv.onPort) {
//兼容性判断,需要node 8+支持的异步钩子 if (!hasAsyncHooks()) {
console.error('The --on-port flag requires the async_hooks builtin module') process.exit(1) } //利用进程间通信创建一个自定义服务启动检测器 const {
socketPath, server } = createChannel((port) => {
//自定义的服务会传回自己的端口 const url = new URL(argv.url, `http://localhost:${
port}`).href const opts = Object.assign({
}, argv, {
onPort: false, url: url }) //按照参数执行压测 runTracker(opts, () => {
proc.kill('SIGINT') server.close() }) }) //将预加载项加入环境变量的PATH中去 const alterPath = managePath({
PATH: process.env.NODE_PATH }) alterPath.unshift(path.join(__dirname, 'lib/preload')) //argv.spawn[]此时是命令'--'后面跟随的命令,用于执行指定的额外自定义服务 //这个服务需要实现通过AUTOCANNON_SOCKET,将自己的端口告诉给autocannon的功能 //启动这个自定义服务前在服务里加载这个预加载模块autocannonDetectPort const proc = spawn(argv.spawn[0], argv.spawn.slice(1), {
stdio: ['ignore', 'inherit', 'inherit'], env: Object.assign({
}, process.env, {
NODE_OPTIONS: ['-r', 'autocannonDetectPort'].join(' ') + (process.env.NODE_OPTIONS ? ` ${
process.env.NODE_OPTIONS}` : ''), NODE_PATH: alterPath.get(), AUTOCANNON_SOCKET: socketPath }) }) }function createChannel (onport) {
const pipeName = `${
process.pid}.autocannon` //windows和*nix对于localsocket的不同调用位置 const socketPath = process.platform === 'win32' ? `\\\\?\\pipe\\${
pipeName}` : path.join(os.tmpdir(), pipeName) const server = net.createServer((socket) => {
//socket一旦连接并返回所启动的服务的端口,就开始压测 socket.once('data', (chunk) => {
const port = chunk.toString() onport(port) }) }) //监听localsocket server.listen(socketPath) server.on('close', () => {
try {
fs.unlinkSync(socketPath) } catch (err) {
} }) return {
socketPath, server }}

要看明白autocannon调用预加载模块的目的,就不得不看一下这个预加载模块autocannonDetectPort到底写了什么:

//当app监听了某个端口,给予提醒const onListen = require('on-net-listen')const net = require('net')//获取IPC的unix socketconst socket = net.connect(process.env.AUTOCANNON_SOCKET)//将端口信息通过IPC传递给autocannononListen(function (addr) {
this.destroy() const port = Buffer.from(addr.port + '') socket.write(port)})//关闭socketsocket.unref()

至此,一切都明朗了:

对于某些需要一定初始化时间的服务,不能在服务刚一启动就进行压测,需要等待其初始化完成,开始监听端口时再进行.对于这种情况,就需要–on-port这个运行标志了.
因此’–on-port’标志需要和’–‘参数一起使用,’–‘参数后面跟着需要压测的服务,将’–on-port’标志设为true后,就可以实现等待初始化完成.
autocannon处理是这样的:

  1. 当’–on-port’标志打开时,autocannon运行一个小的IPC服务,用于接收待测服务初始化完成后传回来的端口号,

  2. 将’autocannonDetectPort’这个检测端口的预加载模块(也是一个IPC客户端)的路径写入环境变量,用于后期调用.

  3. 正式运行待测服务时,通过 -r 参数进行预加载,将’autocannonDetectPort’嵌入待测服务中.

  4. 等待待测服务初始化完毕,开始监听端口,'autocannonDetectPort’将监听的端口发送回IPC服务端autocannon主程序.

  5. autocannon接收到端口后视为初始化完毕,开始进行压测.当然,如果已经给autocannon传入了端口,则此次IPC传回端口只作为一个服务初始化完成的标志,不使用该传回的端口.

这里的巧妙点就是这个命令了,不用待测服务修改源文件,没有侵入性.

progressTracker.js

progressTracker是以命令行的方式展示进度的模块,因此加载了许多的处理显示效果的库.

本质上是对autocannon的各种事件的监听,并展示相应结果.

lib/run.js

run模块是整个autocannon的核心,催动着整个程序的运行.

run模块主要处理了两件事:

  • 初始化所有的客户端.
  • 定时统计/监视实时进度.

先说说初始化部分:

run模块引用了直方图库hdr-histogram-js用于记录处理统计结果.
在autocannon中,发起请求的根源是client,而压力的测试也更多来自于client的多少以及持续的时间/总量.
client本质上是eventemitter的继承,因此,run模块监听了client上诸如’respone’,‘timeout’,'error’等事件,对不同事件作出相应.
大多数响应方法是用于统计计数的.同时,run模块本身也继承了eventemitter,在接受到各种事件时也会转发事件.

运行时有一个重要的标志:stop标志.

autocannon有几种情况下会stop:
正常情况下:

  1. 请求达到了预定的请求数,stop标志变为true.
  2. 求情发送持续时长到达预定时长,stop标志变为true.
    非正常情况下:
  3. 错误数+超时数超过了阈值(如果有的话),stop标志变为true.

为了不占用资源,并没有添加一个对stop的监听,也就是是说,stop变为true时并不会立即停止.

处理stop的位置是在run模块的另一个部分,定时统计/监视进度的函数:
当run模块初始化时,会启动一个定时器,每一秒收集一次压测的各项统计计数.这样是合理的设计,因为既然是压测,其短时间内反馈相应变化非常大,没有必要每次有变化就立即相应,即将收集数据的方法绑定到各个client的相应事件上.基于性能的要求也不应如此.
在每秒收集数据时也检查一下stop标志是否为true.若为true,则将client尽数销毁,各计时器也一并清理,同时激发’完成’事件,将统计结果通过事件传递出去.

#lib/httpClient.js

httpClient是发出请求的主体,若干httpClient给待测服务施加了压力.
我觉得httpClient是整个程序的精华所在,httpClient并没有用request或http.request这种封装好的模块,它通过一些底层的编写socket构建请求让我们了解到一些因为频繁使用koa,express等现有框架而忽视的细节:

Client.prototype._connect = function () {
//是否是安全传输 if (this.secure) {
//IPC模式下使用指定的unix socket来连接 if (this.ipc) {
this.conn = tls.connect(this.opts.socketPath, {
rejectUnauthorized: false }) } else {
//建立tlssocket this.conn = tls.connect(this.opts.port, this.opts.hostname, {
rejectUnauthorized: false, servername: this.opts.servername }) } } else {
if (this.ipc) {
this.conn = net.connect(this.opts.socketPath) } else {
//建立普通socket this.conn = net.connect(this.opts.port, this.opts.hostname) } } //遇到错误重新建立socket this.conn.on('error', (error) => {
this.emit('connError', error) if (!this.destroyed) this._connect() }) //接受到信息时进行的处理i this.conn.on('data', (chunk) => {
this.resData[0].bytes += chunk.length this.parser.execute(chunk) }) //一次请求结束,接着重复执行请求 this.conn.on('end', () => {
if (!this.destroyed) this._connect() }) this._doRequest(0)}

同时定义了请求的发送与socket的销毁:

Client.prototype._doRequest = function (rpi) {
//是否超过频率限制 if (!this.rate || (this.rate && this.reqsMadeThisSecond++ < this.rate)) {
if (!this.destroyed && this.responseMax && this.reqsMade >= this.responseMax) {
return this.destroy() } this.emit('request') //记录高精度的时间,,用来获取纳秒级别的时间差 this.resData[rpi].startTime = process.hrtime() //发送请求 this.conn.write(this.requestIterator.move()) //重置超时记录器 this.timeoutTicker.reschedule(this.timeout) } else {
this.paused = true }}Client.prototype._destroyConnection = function () {
//清除所有监听 this.conn.removeAllListeners('error') this.conn.removeAllListeners('end') this.conn.on('error', () => {
}) //销毁socket this.conn.destroy()}Client.prototype.destroy = function () {
if (!this.destroyed) {
this.destroyed = true this.timeoutTicker.clear() if (this.rate) clearInterval(this.rateInterval) this.emit('done') this._destroyConnection() }}

这样做的好处自然是更灵活的定制细节.

其中有一个对请求返回处理的细节,在连接时,有这样一段代码:

this.conn.on('data', (chunk) => {
this.resData[0].bytes += chunk.length this.parser.execute(chunk) })

这里的parser是HTTPParser,是一个内置的c/c++模块,用于解析出respones的请求体,本身是基于事件绑定对不同阶段进行不同处理.

在解析过程中,经历了以下事件:

  • kOnHeaders:不断解析获取的请求头
  • kOnHeadersComplete:请求头解析完毕
  • kOnBody:不断解析获取的请求体
  • kOnMessageComplete:请求体解析完毕
  • kOnExecute:一次解析完毕 ( 无法一次性接收 HTTP 报文的情况 )

在httpClient中,对这些阶段进行了绑定:

this.parser = new HTTPParser(HTTPParser.RESPONSE)this.parser[HTTPParser.kOnHeaders] = () => {
}this.parser[HTTPParser.kOnHeadersComplete] = (opts) => {
this.emit('headers', opts) this.resData[0].headers = opts}//监听了'body'事件就可以获取到body的数据this.parser[HTTPParser.kOnBody] = (body) => {
this.emit('body', body)}this.parser[HTTPParser.kOnMessageComplete] = () => {
//获取高精度的时间,精确计算一个请求的花费 let end = process.hrtime(this.resData[0].startTime) let responseTime = end[0] * 1e3 + end[1] / 1e6 //返回此次请求结果 this.emit('response', this.resData[0].headers.statusCode, this.resData[0].bytes, responseTime) this.resData[0].bytes = 0 if (!this.destroyed && this.reconnectRate && this.reqsMade % this.reconnectRate === 0) {
return this._resetConnection() } //一次请求结束后立即再开始一次请求 this._doRequest(0)}

如果我们用request等现有库发送大量请求,是会经常出现各种原因导致nodejs崩溃的.理论来说一台主机一般可以维持65535个连接,而nodejs维持个1万+的连接也没问题.但是使用request等框架,很可能发送上百上千的请求就挂掉了.其中的原因可能是因为各种资源的泄露或占用没有被释放.也可能是socket资源没有充分利用起来,每次都建立新对象,旧的又没有及时被回收,造成socket资源紧张.

面对这种情况,一般是需要限制发送频,保证处理速度,但这对于一个压测软件来说,是致命的.
从底层编写的请求发送体,不仅可以在精确的节点获取精确的信息,还可以控制请求发送方方面面的细节,尽可能避免资源泄露,能重复使用的就重复使用,保证了稳定性.
至此,lib/httpClient模块发送请求的流程基本上是这样的:

lib/requestIterator.js和lib/httpRequestBuilder

请求迭代器存在的意义在于请求体可以随着请求而变化,这对无缓存压测很有意义.

requestIterator本身是一个有限循环数组的迭代,在给定的请求体内数组不断的循环,供httpclient发送请求.

RequestIterator.prototype.move = function () {
//返回当前的请求体 let ret = this.currentRequest.requestBuffer //将当前请求体的指针指向请求体数组内下一个请求体 this.nextRequest() //他还有随机生成id的功能 return this.reqDefaults.idReplacement ? Buffer.from(ret.toString().replace(/\[
\]/g, hyperid())) : ret}

因为是socket直接发送请求,所以发送的请求体需要经过httpRequestBuilder,将用户定义的请求参数最终处理成Buffer形式.

总结

要说autocannon最有价值学习的地方,我觉得应该是其从底层入手构建的请求模块.对细节的把控支持其能短时间发送大量请求.

autocannon可以控制同时worker的数量,当前常见的限流/限频的流程控制方法也有控制worker数量的功能,其中有何不同呢?
以朴灵大牛的为例,bagpipe是比较有名的并行控制模块,他与autocannon的理念正好相反,是为了限制发送请求的速度,保证若干个下载结束后再进行下若干个请求.
因此,这两者的差异就很明确了:他们的目的和要处理的任务不同.

  • autocannon注重过程,bagpipe注重结果.如果用bagpipe写autocannon的流程的话,autocannon就会变成一个资源下载器.
  • 从根本上来说,autocannon是若干个worker去干一个任务,而bagpipe则是一个worker干若干个任务.对于有序的,有限的,复杂的任务,多worker形式相比单worker形式并不适宜.而autocannon所面临的任务,正好是无序,简单,无限的.

转载地址:http://ygqws.baihongyu.com/

你可能感兴趣的文章
Docker(一)使用阿里云容器镜像服务
查看>>
Docker(二) 基础命令
查看>>
Docker(三) 构建镜像
查看>>
Spring 全家桶注解一览
查看>>
JDK1.8-Stream API使用
查看>>
cant connect to local MySQL server through socket /tmp/mysql.sock (2)
查看>>
vue中的状态管理 vuex store
查看>>
Maven之阿里云镜像仓库配置
查看>>
Maven:mirror和repository 区别
查看>>
微服务网关 Spring Cloud Gateway
查看>>
SpringCloud Feign的使用方式(一)
查看>>
SpringCloud Feign的使用方式(二)
查看>>
关于Vue-cli+ElementUI项目 打包时排除Vue和ElementUI
查看>>
Vue 路由懒加载根据根路由合并chunk块
查看>>
vue中 不更新视图 四种解决方法
查看>>
MySQL 查看执行计划
查看>>
OpenGL ES 3.0(四)图元、VBO、VAO
查看>>
OpenGL ES 3.0(五)纹理
查看>>
OpenGL ES 3.0(八)实现带水印的相机预览功能
查看>>
OpenGL ES 3.0(九)实现美颜相机功能
查看>>