插件开发

插件的定义

所谓的插件,没有什么固定的规则,只需要完成安装操作即可。插件可以实现任意的功能扩展,最常见的是实现某种传输协议用来推流或者拉流

插件的安装(注册)

下面是内置插件jessica的源码,代表了典型的插件安装

  1. package jessica
  2. import (
  3. "io/ioutil"
  4. "log"
  5. "mime"
  6. "net/http"
  7. "path"
  8. "path/filepath"
  9. "strings"
  10. . "github.com/Monibuca/engine/v2"
  11. . "github.com/logrusorgru/aurora"
  12. )
  13. var config = new(ListenerConfig)
  14. var publicPath string
  15. func init() {
  16. plugin := &PluginConfig{
  17. Name: "Jessica",
  18. Type: PLUGIN_SUBSCRIBER,
  19. Config: config,
  20. Run: run,
  21. }
  22. InstallPlugin(plugin)
  23. publicPath = filepath.Join(plugin.Dir, "ui", "public")
  24. }
  25. func run() {
  26. Print(Green("server Jessica start at"), BrightBlue(config.ListenAddr))
  27. http.HandleFunc("/jessibuca/", jessibuca)
  28. log.Fatal(http.ListenAndServe(config.ListenAddr, http.HandlerFunc(WsHandler)))
  29. }
  30. func jessibuca(w http.ResponseWriter, r *http.Request) {
  31. filePath := strings.TrimPrefix(r.URL.Path, "/jessibuca")
  32. if mime := mime.TypeByExtension(path.Ext(filePath)); mime != "" {
  33. w.Header().Set("Content-Type", mime)
  34. }
  35. if f, err := ioutil.ReadFile(filepath.Join(publicPath, filePath)); err == nil {
  36. if _, err = w.Write(f); err != nil {
  37. w.WriteHeader(500)
  38. }
  39. } else {
  40. w.WriteHeader(404)
  41. }
  42. }

源码说明

  • init会在go项目启动最开始的时候执行,我们需要在引擎Run之前注册我们的插件。
  • 注册插件,是调用引擎提供的InstallPlugin函数,传入插件的关键信息。
  • 插件的名称Name必须是唯一的,只需要保证在项目中唯一即可。
  • 插件的Config属性是一个自定义的结构体,只需要保证配置文件的解析和这个结构体定义一致即可。
  • 当主程序读取配置文件完成解析后,会调用各个插件的Run函数,上面代码中执行了一个http的端口监听
  • jessica插件的界面需要读取一些静态资源,所以利用了Gateway的http服务,我们注册了一个路由。所有插件都可以共用Gateway插件的http服务,但要注意的是路由不可以有冲突。当然插件也可以自己创建http服务,启用不同的端口号。

开发插件的UI界面

步骤:

  1. 创建ui目录用于存放ui的源码和编译出的文件
  2. 创建ui/src/App.vue作为UI界面的入口
  3. 编译为vue lib 名称必须为plugin-[组件名小写] 供gateway调用

例如:下面的npm命令将ui/src/App.vue导出名为plugin-jessica 的Vue lib,导出的文件在项目下的ui/dist目录。

  1. "scripts": {
  2. "build": "vue-cli-service build --target lib --name plugin-jessica"
  3. }

组件的Props中可以配置插件配置项用于接收插件的配置信息 例如:

  1. props: {
  2. ListenAddr: String
  3. }

Gateway插件中提供的公共组件均可以使用,例如stream-table,可供展示所有的流信息

  1. <stream-table>
  2. <template v-slot="scope">
  3. <m-button @click="preview(scope)">预览</m-button>
  4. <template>
  5. </stream-table>

该组件有一个默认的作用域槽,可以用来扩展对每一个流的操作

Gateway插件中提供的Vuex对象,所有插件均可访问

  1. export default new Vuex.Store({
  2. state: {
  3. plugins: [],
  4. Address: location.hostname,
  5. NetWork: [],
  6. Streams: [],
  7. Memory: {
  8. Used: 0,
  9. Usage: 0
  10. },
  11. CPUUsage: 0,
  12. HardDisk: {
  13. Used: 0,
  14. Usage: 0
  15. },
  16. Children: {},
  17. engineInfo: {},
  18. },
  19. mutations: {
  20. update(state, payload) {
  21. Object.assign(state, payload)
  22. },
  23. },
  24. actions: {
  25. fetchEngineInfo({ commit }) {
  26. return window.ajax.getJSON(apiHost + "/api/sysInfo").then(engineInfo => commit("update", { engineInfo }))
  27. },
  28. fetchPlugins({ commit }) {
  29. return window.ajax.getJSON(apiHost + "/api/plugins").then(plugins => {
  30. plugins.sort((a, b) => a.Name > b.Name ? 1 : -1)
  31. commit("update", { plugins })
  32. return plugins
  33. })
  34. },
  35. fetchSummary({ commit }) {
  36. summaryES = new EventSource(apiHost + "/api/summary");
  37. summaryES.onmessage = evt => {
  38. if (!evt.data) return;
  39. let summary = JSON.parse(evt.data);
  40. summary.Address = location.hostname;
  41. if (!summary.Streams) summary.Streams = [];
  42. summary.Streams.sort((a, b) =>
  43. a.StreamPath > b.StreamPath ? 1 : -1
  44. );
  45. commit("update", summary)
  46. };
  47. },
  48. },
  49. })

开发无UI的插件

默认就是无UI的插件

开发订阅者插件

所谓订阅者就是用来从流媒体服务器接收音视频流的程序,例如RTMP协议执行play命令后、http-flv请求响应程序、websocket响应程序。内置插件中录制flv程序也是一个特殊的订阅者。 下面是http-flv插件的源码,供参考

  1. package hdl
  2. import (
  3. "log"
  4. "net/http"
  5. "strings"
  6. . "github.com/Monibuca/engine/v2"
  7. "github.com/Monibuca/engine/v2/avformat"
  8. . "github.com/logrusorgru/aurora"
  9. )
  10. var config = new(ListenerConfig)
  11. func init() {
  12. InstallPlugin(&PluginConfig{
  13. Name: "HDL",
  14. Type: PLUGIN_SUBSCRIBER,
  15. Config: config,
  16. Run: run,
  17. })
  18. }
  19. func run() {
  20. Print(Green("HDL start at "), BrightBlue(config.ListenAddr))
  21. log.Fatal(http.ListenAndServe(config.ListenAddr, http.HandlerFunc(HDLHandler)))
  22. }
  23. func HDLHandler(w http.ResponseWriter, r *http.Request) {
  24. sign := r.URL.Query().Get("sign")
  25. if err := AuthHooks.Trigger(sign); err != nil {
  26. w.WriteHeader(403)
  27. return
  28. }
  29. stringPath := strings.TrimLeft(r.RequestURI, "/")
  30. if strings.HasSuffix(stringPath, ".flv") {
  31. stringPath = strings.TrimRight(stringPath, ".flv")
  32. }
  33. if s := FindStream(stringPath); s != nil {
  34. //atomic.AddInt32(&hdlId, 1)
  35. w.Header().Set("Transfer-Encoding", "chunked")
  36. w.Header().Set("Content-Type", "video/x-flv")
  37. w.Write(avformat.FLVHeader)
  38. p := Subscriber{
  39. Sign: sign,
  40. OnData: func(packet *avformat.SendPacket) error {
  41. return avformat.WriteFLVTag(w, packet)
  42. },
  43. SubscriberInfo: SubscriberInfo{
  44. ID: r.RemoteAddr, Type: "FLV",
  45. },
  46. }
  47. p.Subscribe(stringPath)
  48. } else {
  49. w.WriteHeader(404)
  50. }
  51. }

其中,核心逻辑就是创建Subscriber对象,每一个订阅者需要提供OnData函数,用来接收来自发布者广播出来的音视频数据。 最后调用该对象的Subscribe函数进行播放。请注意:Subscribe函数会阻塞当前goroutine。

开发发布者插件

所谓发布者,就是提供音视频数据的程序,例如接收来自OBS、ffmpeg的推流的程序。内置插件中,集群功能里面有一个特殊的发布者,它接收来自源服务器的音视频数据,然后在本服务器中广播音视频。 以此为例,我们需要提供一个结构体定义来表示特定的发布者:

  1. type Receiver struct {
  2. Publisher
  3. io.Reader
  4. *bufio.Writer
  5. }

其中Publisher 是固定的,必须包含,且必须以组合继承的方式定义。其余的成员则是任意的。 发布者的发布动作需要特定条件的触发,例如在集群插件中,当本服务器有订阅者订阅了某个流,而该流并没有发布者的时候就会触发向源服务器拉流的函数:

  1. func PullUpStream(streamPath string) {
  2. addr, err := net.ResolveTCPAddr("tcp", config.OriginServer)
  3. if MayBeError(err) {
  4. return
  5. }
  6. conn, err := net.DialTCP("tcp", nil, addr)
  7. if MayBeError(err) {
  8. return
  9. }
  10. brw := bufio.NewReadWriter(bufio.NewReader(conn), bufio.NewWriter(conn))
  11. p := &Receiver{
  12. Reader: brw.Reader,
  13. Writer: brw.Writer,
  14. }
  15. if p.Publish(streamPath) {
  16. p.Type = "Cluster"
  17. p.WriteByte(MSG_SUBSCRIBE)
  18. p.WriteString(streamPath)
  19. p.WriteByte(0)
  20. p.Flush()
  21. for _, v := range p.Subscribers {
  22. p.Auth(v)
  23. }
  24. } else {
  25. return
  26. }
  27. defer p.Cancel()
  28. for cmd, err := brw.ReadByte(); !MayBeError(err); cmd, err = brw.ReadByte() {
  29. switch cmd {
  30. case MSG_AUDIO:
  31. if t, payload, err := p.readAVPacket(avformat.FLV_TAG_TYPE_AUDIO); err == nil {
  32. p.PushAudio(t, payload)
  33. }
  34. case MSG_VIDEO:
  35. if t, payload, err := p.readAVPacket(avformat.FLV_TAG_TYPE_VIDEO); err == nil && len(payload) > 2 {
  36. p.PushVideo(t, payload)
  37. }
  38. case MSG_AUTH:
  39. cmd, err = brw.ReadByte()
  40. if MayBeError(err) {
  41. return
  42. }
  43. bytes, err := brw.ReadBytes(0)
  44. if MayBeError(err) {
  45. return
  46. }
  47. subId := strings.Split(string(bytes[0:len(bytes)-1]), ",")[0]
  48. if v, ok := p.Subscribers[subId]; ok {
  49. if cmd != 1 {
  50. v.Cancel()
  51. }
  52. }
  53. default:
  54. log.Printf("unknown cmd:%v", cmd)
  55. }
  56. }
  57. }

正在该函数中会向源服务器建立tcp连接,然后发送特定命令表示需要拉流,当我们接收到源服务器的数据的时候,就调用PushVideo和PushAudio函数来广播音视频。

核心逻辑是调用Publisher的Publish以及PushVideo、PushAudio函数

开发钩子插件

钩子插件就是在服务器的关键逻辑处插入的函数调用,方便扩展服务器的功能,比如对连接进行验证,或者触发一些特殊的发布者。 目前提供的钩子包括

  • 当发布者开始发布时 OnPublishHooks.AddHook(onPublish) 例如:
  1. func onPublish(r *Stream) {
  2. for _, v := range r.Subscribers {
  3. if err := CheckSign(v.Sign); err != nil {
  4. v.Cancel()
  5. }
  6. }
  7. }

此时可以访问房间里面的订阅者,对其进行验证。

  • 当有订阅者订阅了某个流时,OnSubscribeHooks.AddHook(onSubscribe) 例如:
  1. func onSubscribe(s *Subscriber) {
  2. if s.Publisher == nil {
  3. go PullUpStream(s.StreamPath)
  4. }
  5. }

拉取源服务器的流