test.js 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566
  1. 'use strict'
  2. /* eslint-disable no-var */
  3. var test = require('tape')
  4. var buildQueue = require('../')
  5. test('concurrency', function (t) {
  6. t.plan(2)
  7. t.throws(buildQueue.bind(null, worker, 0))
  8. t.doesNotThrow(buildQueue.bind(null, worker, 1))
  9. function worker (arg, cb) {
  10. cb(null, true)
  11. }
  12. })
  13. test('worker execution', function (t) {
  14. t.plan(3)
  15. var queue = buildQueue(worker, 1)
  16. queue.push(42, function (err, result) {
  17. t.error(err, 'no error')
  18. t.equal(result, true, 'result matches')
  19. })
  20. function worker (arg, cb) {
  21. t.equal(arg, 42)
  22. cb(null, true)
  23. }
  24. })
  25. test('limit', function (t) {
  26. t.plan(4)
  27. var expected = [10, 0]
  28. var queue = buildQueue(worker, 1)
  29. queue.push(10, result)
  30. queue.push(0, result)
  31. function result (err, arg) {
  32. t.error(err, 'no error')
  33. t.equal(arg, expected.shift(), 'the result matches')
  34. }
  35. function worker (arg, cb) {
  36. setTimeout(cb, arg, null, arg)
  37. }
  38. })
  39. test('multiple executions', function (t) {
  40. t.plan(15)
  41. var queue = buildQueue(worker, 1)
  42. var toExec = [1, 2, 3, 4, 5]
  43. var count = 0
  44. toExec.forEach(function (task) {
  45. queue.push(task, done)
  46. })
  47. function done (err, result) {
  48. t.error(err, 'no error')
  49. t.equal(result, toExec[count - 1], 'the result matches')
  50. }
  51. function worker (arg, cb) {
  52. t.equal(arg, toExec[count], 'arg matches')
  53. count++
  54. setImmediate(cb, null, arg)
  55. }
  56. })
  57. test('multiple executions, one after another', function (t) {
  58. t.plan(15)
  59. var queue = buildQueue(worker, 1)
  60. var toExec = [1, 2, 3, 4, 5]
  61. var count = 0
  62. queue.push(toExec[0], done)
  63. function done (err, result) {
  64. t.error(err, 'no error')
  65. t.equal(result, toExec[count - 1], 'the result matches')
  66. if (count < toExec.length) {
  67. queue.push(toExec[count], done)
  68. }
  69. }
  70. function worker (arg, cb) {
  71. t.equal(arg, toExec[count], 'arg matches')
  72. count++
  73. setImmediate(cb, null, arg)
  74. }
  75. })
  76. test('set this', function (t) {
  77. t.plan(3)
  78. var that = {}
  79. var queue = buildQueue(that, worker, 1)
  80. queue.push(42, function (err, result) {
  81. t.error(err, 'no error')
  82. t.equal(this, that, 'this matches')
  83. })
  84. function worker (arg, cb) {
  85. t.equal(this, that, 'this matches')
  86. cb(null, true)
  87. }
  88. })
  89. test('drain', function (t) {
  90. t.plan(4)
  91. var queue = buildQueue(worker, 1)
  92. var worked = false
  93. queue.push(42, function (err, result) {
  94. t.error(err, 'no error')
  95. t.equal(result, true, 'result matches')
  96. })
  97. queue.drain = function () {
  98. t.equal(true, worked, 'drained')
  99. }
  100. function worker (arg, cb) {
  101. t.equal(arg, 42)
  102. worked = true
  103. setImmediate(cb, null, true)
  104. }
  105. })
  106. test('pause && resume', function (t) {
  107. t.plan(7)
  108. var queue = buildQueue(worker, 1)
  109. var worked = false
  110. t.notOk(queue.paused, 'it should not be paused')
  111. queue.pause()
  112. queue.push(42, function (err, result) {
  113. t.error(err, 'no error')
  114. t.equal(result, true, 'result matches')
  115. })
  116. t.notOk(worked, 'it should be paused')
  117. t.ok(queue.paused, 'it should be paused')
  118. queue.resume()
  119. queue.resume() // second resume is a no-op
  120. t.notOk(queue.paused, 'it should not be paused')
  121. function worker (arg, cb) {
  122. t.equal(arg, 42)
  123. worked = true
  124. cb(null, true)
  125. }
  126. })
  127. test('pause in flight && resume', function (t) {
  128. t.plan(9)
  129. var queue = buildQueue(worker, 1)
  130. var expected = [42, 24]
  131. t.notOk(queue.paused, 'it should not be paused')
  132. queue.push(42, function (err, result) {
  133. t.error(err, 'no error')
  134. t.equal(result, true, 'result matches')
  135. t.ok(queue.paused, 'it should be paused')
  136. process.nextTick(function () { queue.resume() })
  137. })
  138. queue.push(24, function (err, result) {
  139. t.error(err, 'no error')
  140. t.equal(result, true, 'result matches')
  141. t.notOk(queue.paused, 'it should not be paused')
  142. })
  143. queue.pause()
  144. function worker (arg, cb) {
  145. t.equal(arg, expected.shift())
  146. process.nextTick(function () { cb(null, true) })
  147. }
  148. })
  149. test('altering concurrency', function (t) {
  150. t.plan(7)
  151. var queue = buildQueue(worker, 1)
  152. var count = 0
  153. queue.pause()
  154. queue.push(24, workDone)
  155. queue.push(24, workDone)
  156. queue.concurrency = 2
  157. queue.resume()
  158. t.equal(queue.running(), 2, '2 jobs running')
  159. function workDone (err, result) {
  160. t.error(err, 'no error')
  161. t.equal(result, true, 'result matches')
  162. }
  163. function worker (arg, cb) {
  164. t.equal(0, count, 'works in parallel')
  165. setImmediate(function () {
  166. count++
  167. cb(null, true)
  168. })
  169. }
  170. })
  171. test('idle()', function (t) {
  172. t.plan(12)
  173. var queue = buildQueue(worker, 1)
  174. t.ok(queue.idle(), 'queue is idle')
  175. queue.push(42, function (err, result) {
  176. t.error(err, 'no error')
  177. t.equal(result, true, 'result matches')
  178. t.notOk(queue.idle(), 'queue is not idle')
  179. })
  180. queue.push(42, function (err, result) {
  181. t.error(err, 'no error')
  182. t.equal(result, true, 'result matches')
  183. // it will go idle after executing this function
  184. setImmediate(function () {
  185. t.ok(queue.idle(), 'queue is now idle')
  186. })
  187. })
  188. t.notOk(queue.idle(), 'queue is not idle')
  189. function worker (arg, cb) {
  190. t.notOk(queue.idle(), 'queue is not idle')
  191. t.equal(arg, 42)
  192. setImmediate(cb, null, true)
  193. }
  194. })
  195. test('saturated', function (t) {
  196. t.plan(9)
  197. var queue = buildQueue(worker, 1)
  198. var preworked = 0
  199. var worked = 0
  200. queue.saturated = function () {
  201. t.pass('saturated')
  202. t.equal(preworked, 1, 'started 1 task')
  203. t.equal(worked, 0, 'worked zero task')
  204. }
  205. queue.push(42, done)
  206. queue.push(42, done)
  207. function done (err, result) {
  208. t.error(err, 'no error')
  209. t.equal(result, true, 'result matches')
  210. }
  211. function worker (arg, cb) {
  212. t.equal(arg, 42)
  213. preworked++
  214. setImmediate(function () {
  215. worked++
  216. cb(null, true)
  217. })
  218. }
  219. })
  220. test('length', function (t) {
  221. t.plan(7)
  222. var queue = buildQueue(worker, 1)
  223. t.equal(queue.length(), 0, 'nothing waiting')
  224. queue.push(42, done)
  225. t.equal(queue.length(), 0, 'nothing waiting')
  226. queue.push(42, done)
  227. t.equal(queue.length(), 1, 'one task waiting')
  228. queue.push(42, done)
  229. t.equal(queue.length(), 2, 'two tasks waiting')
  230. function done (err, result) {
  231. t.error(err, 'no error')
  232. }
  233. function worker (arg, cb) {
  234. setImmediate(function () {
  235. cb(null, true)
  236. })
  237. }
  238. })
  239. test('getQueue', function (t) {
  240. t.plan(10)
  241. var queue = buildQueue(worker, 1)
  242. t.equal(queue.getQueue().length, 0, 'nothing waiting')
  243. queue.push(42, done)
  244. t.equal(queue.getQueue().length, 0, 'nothing waiting')
  245. queue.push(42, done)
  246. t.equal(queue.getQueue().length, 1, 'one task waiting')
  247. t.equal(queue.getQueue()[0], 42, 'should be equal')
  248. queue.push(43, done)
  249. t.equal(queue.getQueue().length, 2, 'two tasks waiting')
  250. t.equal(queue.getQueue()[0], 42, 'should be equal')
  251. t.equal(queue.getQueue()[1], 43, 'should be equal')
  252. function done (err, result) {
  253. t.error(err, 'no error')
  254. }
  255. function worker (arg, cb) {
  256. setImmediate(function () {
  257. cb(null, true)
  258. })
  259. }
  260. })
  261. test('unshift', function (t) {
  262. t.plan(8)
  263. var queue = buildQueue(worker, 1)
  264. var expected = [1, 2, 3, 4]
  265. queue.push(1, done)
  266. queue.push(4, done)
  267. queue.unshift(3, done)
  268. queue.unshift(2, done)
  269. function done (err, result) {
  270. t.error(err, 'no error')
  271. }
  272. function worker (arg, cb) {
  273. t.equal(expected.shift(), arg, 'tasks come in order')
  274. setImmediate(function () {
  275. cb(null, true)
  276. })
  277. }
  278. })
  279. test('unshift && empty', function (t) {
  280. t.plan(2)
  281. var queue = buildQueue(worker, 1)
  282. var completed = false
  283. queue.pause()
  284. queue.empty = function () {
  285. t.notOk(completed, 'the task has not completed yet')
  286. }
  287. queue.unshift(1, done)
  288. queue.resume()
  289. function done (err, result) {
  290. completed = true
  291. t.error(err, 'no error')
  292. }
  293. function worker (arg, cb) {
  294. setImmediate(function () {
  295. cb(null, true)
  296. })
  297. }
  298. })
  299. test('push && empty', function (t) {
  300. t.plan(2)
  301. var queue = buildQueue(worker, 1)
  302. var completed = false
  303. queue.pause()
  304. queue.empty = function () {
  305. t.notOk(completed, 'the task has not completed yet')
  306. }
  307. queue.push(1, done)
  308. queue.resume()
  309. function done (err, result) {
  310. completed = true
  311. t.error(err, 'no error')
  312. }
  313. function worker (arg, cb) {
  314. setImmediate(function () {
  315. cb(null, true)
  316. })
  317. }
  318. })
  319. test('kill', function (t) {
  320. t.plan(5)
  321. var queue = buildQueue(worker, 1)
  322. var expected = [1]
  323. var predrain = queue.drain
  324. queue.drain = function drain () {
  325. t.fail('drain should never be called')
  326. }
  327. queue.push(1, done)
  328. queue.push(4, done)
  329. queue.unshift(3, done)
  330. queue.unshift(2, done)
  331. queue.kill()
  332. function done (err, result) {
  333. t.error(err, 'no error')
  334. setImmediate(function () {
  335. t.equal(queue.length(), 0, 'no queued tasks')
  336. t.equal(queue.running(), 0, 'no running tasks')
  337. t.equal(queue.drain, predrain, 'drain is back to default')
  338. })
  339. }
  340. function worker (arg, cb) {
  341. t.equal(expected.shift(), arg, 'tasks come in order')
  342. setImmediate(function () {
  343. cb(null, true)
  344. })
  345. }
  346. })
  347. test('killAndDrain', function (t) {
  348. t.plan(6)
  349. var queue = buildQueue(worker, 1)
  350. var expected = [1]
  351. var predrain = queue.drain
  352. queue.drain = function drain () {
  353. t.pass('drain has been called')
  354. }
  355. queue.push(1, done)
  356. queue.push(4, done)
  357. queue.unshift(3, done)
  358. queue.unshift(2, done)
  359. queue.killAndDrain()
  360. function done (err, result) {
  361. t.error(err, 'no error')
  362. setImmediate(function () {
  363. t.equal(queue.length(), 0, 'no queued tasks')
  364. t.equal(queue.running(), 0, 'no running tasks')
  365. t.equal(queue.drain, predrain, 'drain is back to default')
  366. })
  367. }
  368. function worker (arg, cb) {
  369. t.equal(expected.shift(), arg, 'tasks come in order')
  370. setImmediate(function () {
  371. cb(null, true)
  372. })
  373. }
  374. })
  375. test('pause && idle', function (t) {
  376. t.plan(11)
  377. var queue = buildQueue(worker, 1)
  378. var worked = false
  379. t.notOk(queue.paused, 'it should not be paused')
  380. t.ok(queue.idle(), 'should be idle')
  381. queue.pause()
  382. queue.push(42, function (err, result) {
  383. t.error(err, 'no error')
  384. t.equal(result, true, 'result matches')
  385. })
  386. t.notOk(worked, 'it should be paused')
  387. t.ok(queue.paused, 'it should be paused')
  388. t.notOk(queue.idle(), 'should not be idle')
  389. queue.resume()
  390. t.notOk(queue.paused, 'it should not be paused')
  391. t.notOk(queue.idle(), 'it should not be idle')
  392. function worker (arg, cb) {
  393. t.equal(arg, 42)
  394. worked = true
  395. process.nextTick(cb.bind(null, null, true))
  396. process.nextTick(function () {
  397. t.ok(queue.idle(), 'is should be idle')
  398. })
  399. }
  400. })
  401. test('push without cb', function (t) {
  402. t.plan(1)
  403. var queue = buildQueue(worker, 1)
  404. queue.push(42)
  405. function worker (arg, cb) {
  406. t.equal(arg, 42)
  407. cb()
  408. }
  409. })
  410. test('unshift without cb', function (t) {
  411. t.plan(1)
  412. var queue = buildQueue(worker, 1)
  413. queue.unshift(42)
  414. function worker (arg, cb) {
  415. t.equal(arg, 42)
  416. cb()
  417. }
  418. })
  419. test('push with worker throwing error', function (t) {
  420. t.plan(5)
  421. var q = buildQueue(function (task, cb) {
  422. cb(new Error('test error'), null)
  423. }, 1)
  424. q.error(function (err, task) {
  425. t.ok(err instanceof Error, 'global error handler should catch the error')
  426. t.match(err.message, /test error/, 'error message should be "test error"')
  427. t.equal(task, 42, 'The task executed should be passed')
  428. })
  429. q.push(42, function (err) {
  430. t.ok(err instanceof Error, 'push callback should catch the error')
  431. t.match(err.message, /test error/, 'error message should be "test error"')
  432. })
  433. })