pub/sub

发布订阅模式

核心点

  • Pub/Sub 模型定义了如何向一个内容节点发布和订阅消息,这些节点被称作主题(topic)。
  • 主题可以被认为是消息的传输中介,发布者(publisher)发布消息到主题,订阅者(subscriber) 从主题订阅消息。
  • 主题使得消息订阅者和消息发布者保持互相独立,不需要接触即可保证消息的传送。

pub/sub解决了什么样的问题?

  • 耗时的问题,比如上传,格式转换、计算等其他耗时任务,消耗的时间大于http服务器的超时时间
  • 类似于im,一对一,一对多、群组等

技术选型

faye,最早用的时候是写ruby的时候,那时候为了用faye而了解node,因为node版本的faye效率比ruby的高非常多,也算是“因ruby得node”吧。

Simple pub/sub messaging for the web http://faye.jcoglan.com

集成

安装

  1. $ npm i -S faye

最简单例子

  1. var http = require('http'),
  2. faye = require('faye');
  3. var server = http.createServer(),
  4. bayeux = new faye.NodeAdapter({mount: '/'});
  5. bayeux.attach(server);
  6. server.listen(8000);

很明显,它是attach到http的server实例上的,所以我们也需要把koa改成对应的

使用类似于express的www写法

  1. #!/usr/bin/env node
  2. var faye = require('faye');
  3. var bayeux = new faye.NodeAdapter({
  4. mount : '/faye',
  5. timeout : 45
  6. });
  7. /**
  8. * Module dependencies.
  9. */
  10. var app = require('./app');
  11. // console.dir(app)
  12. var debug = require('debug')('demo:server');
  13. var http = require('http');
  14. /**
  15. * Get port from environment and store in Express.
  16. */
  17. var port = normalizePort(process.env.PORT || '3000');
  18. // app.set('port', port);
  19. /**
  20. * Create HTTP server.
  21. */
  22. var server = http.createServer(app.callback());
  23. bayeux.attach(server);
  24. /**
  25. * Listen on provided port, on all network interfaces.
  26. */
  27. server.listen(port);
  28. server.on('error', onError);
  29. server.on('listening', onListening);
  30. /**
  31. * Normalize a port into a number, string, or false.
  32. */
  33. function normalizePort(val) {
  34. var port = parseInt(val, 10);
  35. if (isNaN(port)) {
  36. // named pipe
  37. return val;
  38. }
  39. if (port >= 0) {
  40. // port number
  41. return port;
  42. }
  43. return false;
  44. }
  45. /**
  46. * Event listener for HTTP server "error" event.
  47. */
  48. function onError(error) {
  49. if (error.syscall !== 'listen') {
  50. throw error;
  51. }
  52. var bind = typeof port === 'string'
  53. ? 'Pipe ' + port
  54. : 'Port ' + port;
  55. // handle specific listen errors with friendly messages
  56. switch (error.code) {
  57. case 'EACCES':
  58. console.error(bind + ' requires elevated privileges');
  59. process.exit(1);
  60. break;
  61. case 'EADDRINUSE':
  62. console.error(bind + ' is already in use');
  63. process.exit(1);
  64. break;
  65. default:
  66. throw error;
  67. }
  68. }
  69. /**
  70. * Event listener for HTTP server "listening" event.
  71. */
  72. function onListening() {
  73. var addr = server.address();
  74. var bind = typeof addr === 'string'
  75. ? 'pipe ' + addr
  76. : 'port ' + addr.port;
  77. debug('Listening on ' + bind);
  78. }

其实核心很简单

  1. var server = http.createServer(app.callback());

这是koa和http的集成,所以有了server实例就可以加faye了。

  1. bayeux.attach(server);

至此,一个建的服务就构建好。

模式1:客户端订阅,服务器端发布

构建前端

  1. extends layouts/layout
  2. block content
  3. h1= title
  4. p Welcome to StuQ Koa小班课!
  5. div
  6. a(href='/users/register') 注册
  7. span |
  8. a(href='/users/login') 登录
  9. span |
  10. a(href='/users/logout') 销毁
  11. script(src="http://127.0.0.1:3000/faye/client.js")
  12. script.
  13. var client = new Faye.Client('http://127.0.0.1:3000/faye', {
  14. timeout : 120,
  15. retry : 5
  16. });
  17. Logger = {
  18. incoming: function(message, callback) {
  19. console.log('incoming', message);
  20. callback(message);
  21. },
  22. outgoing: function(message, callback) {
  23. console.log('outgoing', message);
  24. callback(message);
  25. }
  26. };
  27. client.addExtension(Logger);
  28. client.on('transport:down', function() {
  29. // the client is offline
  30. });
  31. client.on('transport:up', function() {
  32. // the client is online
  33. });
  34. var subscription = client.subscribe('/messages', function(message) {
  35. // handle message
  36. alert(message.text);
  37. });

服务器端发布

  1. "use strict"
  2. var Faye = require('faye');
  3. const router = require('koa-router')()
  4. var client = new Faye.Client('http://127.0.0.1:3000/faye');
  5. console.log(client)
  6. // 首页
  7. router.get('/', ctx => {
  8. var session = ctx.session;
  9. setTimeout(function(){
  10. console.log('publish')
  11. client.publish('/messages', {
  12. text: 'Hello world'
  13. });
  14. },3000)
  15. if (session.current_user) {
  16. return ctx.render('index', {
  17. title: '演示session用法: 当前状态为已登录或注册成功'
  18. });
  19. } else {
  20. return ctx.render('index', {
  21. title: '演示session用法: 当前状态为游客'
  22. });
  23. }
  24. });
  25. module.exports = router

核心

  1. client.publish('/messages', {
  2. text: 'Hello world'
  3. });

模式2:客户端订阅,客户端发布

  1. <script type="text/javascript">
  2. var client = new Faye.Client('http://127.0.0.1:3000/faye', {
  3. timeout : 120,
  4. retry : 5
  5. });
  6. Logger = {
  7. incoming: function(message, callback) {
  8. console.log('incoming', message);
  9. callback(message);
  10. },
  11. outgoing: function(message, callback) {
  12. console.log('outgoing', message);
  13. callback(message);
  14. }
  15. };
  16. client.addExtension(Logger);
  17. client.on('transport:down', function() {
  18. // the client is offline
  19. });
  20. client.on('transport:up', function() {
  21. // the client is online
  22. });
  23. var subscription = client.subscribe('/foo', function(message) {
  24. // handle message
  25. console.log(message);
  26. });
  27. setTimeout(function(){
  28. var publication = client.publish('/foo', {text: 'Hi there, foo test'});
  29. publication.then(function() {
  30. alert('Message received by server!');
  31. }, function(error) {
  32. alert('There was a problem: ' + error.message);
  33. });
  34. },2000);
  35. </script>

核心

  1. var publication = client.publish('/foo', {text: 'Hi there, foo test'});

是不是有点像im?

效率问题

它有engine的概念,即存储引擎

https://faye.jcoglan.com/node/engines.html

  • faye-redis
  • faye-redis-sharded
  • faye-couchbase
  1. var faye = require('faye'),
  2. fayeRedis = require('faye-redis');
  3. var server = new faye.NodeAdapter({
  4. mount: '/faye',
  5. timeout: 45,
  6. engine: {
  7. type: fayeRedis,
  8. host: 'localhost',
  9. port: 6379
  10. }
  11. });

服务化

把这样的pub/sub丢出去,变成http接口,会不会更简单易用

https://github.com/i5ting/fayeserver