Point playlist at local files instead of IPFS to test
This commit is contained in:
16
ffmpeg.go
16
ffmpeg.go
@@ -2,17 +2,25 @@ package main
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"log"
|
||||||
"os"
|
"os"
|
||||||
"os/exec"
|
"os/exec"
|
||||||
"strings"
|
"strings"
|
||||||
)
|
)
|
||||||
|
|
||||||
func pipeTest() {
|
func startFfmpeg() {
|
||||||
ffmpegCmd := "cat streampipe.flv | ffmpeg -hide_banner -i pipe: -preset ultrafast -f hls -hls_list_size 30 -hls_time 10 -strftime 1 -use_localtime 1 -hls_segment_filename 'hls/stream-%Y%m%d-%s.ts' -hls_flags delete_segments -segment_wrap 100 hls/temp.m3u8"
|
outputDir := "webroot"
|
||||||
|
chunkLength := "4"
|
||||||
|
|
||||||
out, err := exec.Command("bash", "-c", ffmpegCmd).Output()
|
log.Printf("Starting transcoder with segments saving to %s.", outputDir)
|
||||||
|
|
||||||
|
// ffmpegCmd := "cat streampipe.flv | ffmpeg -hide_banner -i pipe: -vf scale=900:-2 -g 48 -keyint_min 48 -preset ultrafast -f hls -hls_list_size 30 -hls_time 10 -strftime 1 -use_localtime 1 -hls_segment_filename 'hls/stream-%Y%m%d-%s.ts' -hls_flags delete_segments -segment_wrap 100 hls/temp.m3u8"
|
||||||
|
|
||||||
|
ffmpegCmd := "cat streampipe.flv | ffmpeg -hide_banner -i pipe: -vf scale=900:-2 -g 48 -keyint_min 48 -preset ultrafast -f hls -hls_list_size 30 -hls_time " + chunkLength + " -strftime 1 -use_localtime 1 -hls_segment_filename '" + outputDir + "/stream-%Y%m%d-%s.ts' -hls_flags delete_segments -segment_wrap 100 hls/temp.m3u8"
|
||||||
|
fmt.Println(ffmpegCmd)
|
||||||
|
|
||||||
|
_, err := exec.Command("bash", "-c", ffmpegCmd).Output()
|
||||||
verifyError(err)
|
verifyError(err)
|
||||||
fmt.Println(string(out))
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func verifyError(e error) {
|
func verifyError(e error) {
|
||||||
|
|||||||
16
handler.go
16
handler.go
@@ -29,17 +29,17 @@ func (h *Handler) OnServe(conn *rtmp.Conn) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (h *Handler) OnConnect(timestamp uint32, cmd *rtmpmsg.NetConnectionConnect) error {
|
func (h *Handler) OnConnect(timestamp uint32, cmd *rtmpmsg.NetConnectionConnect) error {
|
||||||
log.Printf("OnConnect: %#v", cmd)
|
// log.Printf("OnConnect: %#v", cmd)
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (h *Handler) OnCreateStream(timestamp uint32, cmd *rtmpmsg.NetConnectionCreateStream) error {
|
func (h *Handler) OnCreateStream(timestamp uint32, cmd *rtmpmsg.NetConnectionCreateStream) error {
|
||||||
log.Printf("OnCreateStream: %#v", cmd)
|
// log.Printf("OnCreateStream: %#v", cmd)
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (h *Handler) OnPublish(timestamp uint32, cmd *rtmpmsg.NetStreamPublish) error {
|
func (h *Handler) OnPublish(timestamp uint32, cmd *rtmpmsg.NetStreamPublish) error {
|
||||||
log.Printf("OnPublish: %#v", cmd)
|
// log.Printf("OnPublish: %#v", cmd)
|
||||||
|
|
||||||
// (example) Reject a connection when PublishingName is empty
|
// (example) Reject a connection when PublishingName is empty
|
||||||
if cmd.PublishingName == "" {
|
if cmd.PublishingName == "" {
|
||||||
@@ -50,7 +50,6 @@ func (h *Handler) OnPublish(timestamp uint32, cmd *rtmpmsg.NetStreamPublish) err
|
|||||||
p := filepath.Join(
|
p := filepath.Join(
|
||||||
filepath.Clean(filepath.Join("./", fmt.Sprintf("%s.flv", "streampipe"))),
|
filepath.Clean(filepath.Join("./", fmt.Sprintf("%s.flv", "streampipe"))),
|
||||||
)
|
)
|
||||||
fmt.Println(p)
|
|
||||||
syscall.Mkfifo(p, 0666)
|
syscall.Mkfifo(p, 0666)
|
||||||
f, err := os.OpenFile(p, os.O_RDWR, os.ModeNamedPipe)
|
f, err := os.OpenFile(p, os.O_RDWR, os.ModeNamedPipe)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@@ -65,13 +64,11 @@ func (h *Handler) OnPublish(timestamp uint32, cmd *rtmpmsg.NetStreamPublish) err
|
|||||||
}
|
}
|
||||||
h.flvEnc = enc
|
h.flvEnc = enc
|
||||||
|
|
||||||
go pipeTest()
|
go startFfmpeg()
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
var counter = 0
|
|
||||||
|
|
||||||
func (h *Handler) OnSetDataFrame(timestamp uint32, data *rtmpmsg.NetStreamSetDataFrame) error {
|
func (h *Handler) OnSetDataFrame(timestamp uint32, data *rtmpmsg.NetStreamSetDataFrame) error {
|
||||||
r := bytes.NewReader(data.Payload)
|
r := bytes.NewReader(data.Payload)
|
||||||
|
|
||||||
@@ -81,7 +78,7 @@ func (h *Handler) OnSetDataFrame(timestamp uint32, data *rtmpmsg.NetStreamSetDat
|
|||||||
return nil // ignore
|
return nil // ignore
|
||||||
}
|
}
|
||||||
|
|
||||||
log.Printf("SetDataFrame: Script = %#v", script)
|
// log.Printf("SetDataFrame: Script = %#v", script)
|
||||||
|
|
||||||
if err := h.flvEnc.Encode(&flvtag.FlvTag{
|
if err := h.flvEnc.Encode(&flvtag.FlvTag{
|
||||||
TagType: flvtag.TagTypeScriptData,
|
TagType: flvtag.TagTypeScriptData,
|
||||||
@@ -91,9 +88,6 @@ func (h *Handler) OnSetDataFrame(timestamp uint32, data *rtmpmsg.NetStreamSetDat
|
|||||||
log.Printf("Failed to write script data: Err = %+v", err)
|
log.Printf("Failed to write script data: Err = %+v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
counter++
|
|
||||||
fmt.Println("-------------> " + string(counter))
|
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
8
ipfs.go
8
ipfs.go
@@ -43,7 +43,6 @@ func createIPFSDirectory(ipfs *icore.CoreAPI, directoryName string) {
|
|||||||
verifyError(err)
|
verifyError(err)
|
||||||
|
|
||||||
directoryHash = newlyCreatedDirectoryHash.String()
|
directoryHash = newlyCreatedDirectoryHash.String()
|
||||||
fmt.Println("Created directory hash " + directoryHash)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func save(filePath string, ipfs *icore.CoreAPI) string {
|
func save(filePath string, ipfs *icore.CoreAPI) string {
|
||||||
@@ -107,8 +106,6 @@ func setupPlugins(externalPluginsPath string) error {
|
|||||||
|
|
||||||
// Creates an IPFS node and returns its coreAPI
|
// Creates an IPFS node and returns its coreAPI
|
||||||
func createNode(ctx context.Context, repoPath string) (icore.CoreAPI, *core.IpfsNode, error) {
|
func createNode(ctx context.Context, repoPath string) (icore.CoreAPI, *core.IpfsNode, error) {
|
||||||
fmt.Println("CreateNode...")
|
|
||||||
|
|
||||||
// Open the repo
|
// Open the repo
|
||||||
repo, err := fsrepo.Open(repoPath)
|
repo, err := fsrepo.Open(repoPath)
|
||||||
verifyError(err)
|
verifyError(err)
|
||||||
@@ -142,10 +139,7 @@ func createNode(ctx context.Context, repoPath string) (icore.CoreAPI, *core.Ipfs
|
|||||||
}
|
}
|
||||||
|
|
||||||
func createTempRepo(ctx context.Context) (string, error) {
|
func createTempRepo(ctx context.Context) (string, error) {
|
||||||
fmt.Println("createTempRepo...")
|
|
||||||
|
|
||||||
repoPath, err := ioutil.TempDir("", "ipfs-shell")
|
repoPath, err := ioutil.TempDir("", "ipfs-shell")
|
||||||
fmt.Println(repoPath)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return "", fmt.Errorf("failed to get temp dir: %s", err)
|
return "", fmt.Errorf("failed to get temp dir: %s", err)
|
||||||
}
|
}
|
||||||
@@ -259,7 +253,7 @@ func createIPFSInstance() (*icore.CoreAPI, *core.IpfsNode, error) {
|
|||||||
func startIPFSNode(ipfs icore.CoreAPI, node *core.IpfsNode) { //} icore.CoreAPI {
|
func startIPFSNode(ipfs icore.CoreAPI, node *core.IpfsNode) { //} icore.CoreAPI {
|
||||||
defer fmt.Println("---- IPFS node exited!")
|
defer fmt.Println("---- IPFS node exited!")
|
||||||
|
|
||||||
fmt.Println("IPFS node is running")
|
log.Println("IPFS node is running")
|
||||||
|
|
||||||
bootstrapNodes := []string{
|
bootstrapNodes := []string{
|
||||||
// IPFS Bootstrapper nodes.
|
// IPFS Bootstrapper nodes.
|
||||||
|
|||||||
3
main.go
3
main.go
@@ -6,6 +6,7 @@ import (
|
|||||||
"net/http"
|
"net/http"
|
||||||
|
|
||||||
icore "github.com/ipfs/interface-go-ipfs-core"
|
icore "github.com/ipfs/interface-go-ipfs-core"
|
||||||
|
"github.com/sirupsen/logrus"
|
||||||
log "github.com/sirupsen/logrus"
|
log "github.com/sirupsen/logrus"
|
||||||
"github.com/yutopp/go-rtmp"
|
"github.com/yutopp/go-rtmp"
|
||||||
)
|
)
|
||||||
@@ -55,7 +56,7 @@ func startRTMPService() {
|
|||||||
srv := rtmp.NewServer(&rtmp.ServerConfig{
|
srv := rtmp.NewServer(&rtmp.ServerConfig{
|
||||||
OnConnect: func(conn net.Conn) (io.ReadWriteCloser, *rtmp.ConnConfig) {
|
OnConnect: func(conn net.Conn) (io.ReadWriteCloser, *rtmp.ConnConfig) {
|
||||||
l := log.StandardLogger()
|
l := log.StandardLogger()
|
||||||
//l.SetLevel(logrus.DebugLevel)
|
l.SetLevel(logrus.WarnLevel)
|
||||||
|
|
||||||
h := &Handler{}
|
h := &Handler{}
|
||||||
|
|
||||||
|
|||||||
@@ -35,12 +35,15 @@ func monitorVideoContent(path string, ipfs *icore.CoreAPI) {
|
|||||||
|
|
||||||
filesToUpload[filePath] = newObjectPath
|
filesToUpload[filePath] = newObjectPath
|
||||||
}
|
}
|
||||||
|
|
||||||
playlistBytes, err := ioutil.ReadFile(event.Path)
|
playlistBytes, err := ioutil.ReadFile(event.Path)
|
||||||
verifyError(err)
|
verifyError(err)
|
||||||
playlistString := string(playlistBytes)
|
playlistString := string(playlistBytes)
|
||||||
remotePlaylistString := generateRemotePlaylist(playlistString, filesToUpload)
|
|
||||||
writePlaylist(remotePlaylistString, "webroot/stream.m3u8")
|
if false {
|
||||||
|
playlistString = generateRemotePlaylist(playlistString, filesToUpload)
|
||||||
|
}
|
||||||
|
writePlaylist(playlistString, "webroot/stream.m3u8")
|
||||||
|
|
||||||
} else if filepath.Ext(event.Path) == ".ts" {
|
} else if filepath.Ext(event.Path) == ".ts" {
|
||||||
filesToUpload[filepath.Base(event.Path)] = ""
|
filesToUpload[filepath.Base(event.Path)] = ""
|
||||||
// copy(event.Path, "webroot/"+filepath.Base(event.Path))
|
// copy(event.Path, "webroot/"+filepath.Base(event.Path))
|
||||||
|
|||||||
Reference in New Issue
Block a user