package main import ( "bytes" "fmt" "io" "log" "os" "path/filepath" "syscall" "github.com/pkg/errors" "github.com/yutopp/go-flv" flvtag "github.com/yutopp/go-flv/tag" "github.com/yutopp/go-rtmp" rtmpmsg "github.com/yutopp/go-rtmp/message" ) var _ rtmp.Handler = (*Handler)(nil) // Handler An RTMP connection handler type Handler struct { rtmp.DefaultHandler flvFile *os.File flvEnc *flv.Encoder } func (h *Handler) OnServe(conn *rtmp.Conn) { } func (h *Handler) OnConnect(timestamp uint32, cmd *rtmpmsg.NetConnectionConnect) error { log.Printf("OnConnect: %#v", cmd) return nil } func (h *Handler) OnCreateStream(timestamp uint32, cmd *rtmpmsg.NetConnectionCreateStream) error { log.Printf("OnCreateStream: %#v", cmd) return nil } func (h *Handler) OnPublish(timestamp uint32, cmd *rtmpmsg.NetStreamPublish) error { log.Printf("OnPublish: %#v", cmd) // (example) Reject a connection when PublishingName is empty if cmd.PublishingName == "" { return errors.New("PublishingName is empty") } // Record streams as FLV! p := filepath.Join( filepath.Clean(filepath.Join("./", fmt.Sprintf("%s.flv", "streampipe"))), ) fmt.Println(p) syscall.Mkfifo(p, 0666) f, err := os.OpenFile(p, os.O_RDWR, os.ModeNamedPipe) if err != nil { return errors.Wrap(err, "Failed to create flv file") } h.flvFile = f enc, err := flv.NewEncoder(f, flv.FlagsAudio|flv.FlagsVideo) if err != nil { _ = f.Close() return errors.Wrap(err, "Failed to create flv encoder") } h.flvEnc = enc go pipeTest() return nil } var counter = 0 func (h *Handler) OnSetDataFrame(timestamp uint32, data *rtmpmsg.NetStreamSetDataFrame) error { r := bytes.NewReader(data.Payload) var script flvtag.ScriptData if err := flvtag.DecodeScriptData(r, &script); err != nil { log.Printf("Failed to decode script data: Err = %+v", err) return nil // ignore } log.Printf("SetDataFrame: Script = %#v", script) if err := h.flvEnc.Encode(&flvtag.FlvTag{ TagType: flvtag.TagTypeScriptData, Timestamp: timestamp, Data: &script, }); err != nil { log.Printf("Failed to write script data: Err = %+v", err) } counter++ fmt.Println("-------------> " + string(counter)) return nil } func (h *Handler) OnAudio(timestamp uint32, payload io.Reader) error { var audio flvtag.AudioData if err := flvtag.DecodeAudioData(payload, &audio); err != nil { return err } flvBody := new(bytes.Buffer) if _, err := io.Copy(flvBody, audio.Data); err != nil { return err } audio.Data = flvBody // log.Printf("FLV Audio Data: Timestamp = %d, SoundFormat = %+v, SoundRate = %+v, SoundSize = %+v, SoundType = %+v, AACPacketType = %+v, Data length = %+v", // timestamp, // audio.SoundFormat, // audio.SoundRate, // audio.SoundSize, // audio.SoundType, // audio.AACPacketType, // len(flvBody.Bytes()), // ) if err := h.flvEnc.Encode(&flvtag.FlvTag{ TagType: flvtag.TagTypeAudio, Timestamp: timestamp, Data: &audio, }); err != nil { log.Printf("Failed to write audio: Err = %+v", err) } return nil } func (h *Handler) OnVideo(timestamp uint32, payload io.Reader) error { var video flvtag.VideoData if err := flvtag.DecodeVideoData(payload, &video); err != nil { return err } flvBody := new(bytes.Buffer) if _, err := io.Copy(flvBody, video.Data); err != nil { return err } video.Data = flvBody // log.Printf("FLV Video Data: Timestamp = %d, FrameType = %+v, CodecID = %+v, AVCPacketType = %+v, CT = %+v, Data length = %+v", // timestamp, // video.FrameType, // video.CodecID, // video.AVCPacketType, // video.CompositionTime, // len(flvBody.Bytes()), // ) if err := h.flvEnc.Encode(&flvtag.FlvTag{ TagType: flvtag.TagTypeVideo, Timestamp: timestamp, Data: &video, }); err != nil { log.Printf("Failed to write video: Err = %+v", err) } return nil } func (h *Handler) OnClose() { log.Printf("OnClose") if h.flvFile != nil { _ = h.flvFile.Close() } }