diff --git a/config.go b/config.go index 56d61af81..0e4d301bd 100644 --- a/config.go +++ b/config.go @@ -17,6 +17,7 @@ type Config struct { Files Files `yaml:"files"` FFMpegPath string `yaml:"ffmpegPath"` WebServerPort int `yaml:"webServerPort"` + S3 S3 `yaml:"s3"` } type VideoSettings struct { @@ -36,6 +37,14 @@ type IPFS struct { Gateway string `yaml:"gateway"` } +type S3 struct { + Enabled bool `yaml:"enabled"` + AccessKey string `yaml:"accessKey"` + Secret string `yaml:"secret"` + Bucket string `yaml:"bucket"` + Region string `yaml:"region"` +} + func getConfig() Config { filePath := "config/config.yaml" @@ -50,10 +59,19 @@ func getConfig() Config { if err != nil { panic(err) } + + checkConfig(config) + + // fmt.Printf("%+v\n", config) + return config } func checkConfig(config Config) { + if config.S3.Enabled && config.IPFS.Enabled { + panic("S3 and IPFS support cannot be enabled at the same time. Choose one.") + } + if !fileExists(config.PrivateHLSPath) { panic(fmt.Sprintf("%s does not exist.", config.PrivateHLSPath)) } diff --git a/config/config-example.yaml b/config/config-example.yaml index dee8a0c73..2048ffd2b 100644 --- a/config/config-example.yaml +++ b/config/config-example.yaml @@ -14,4 +14,11 @@ files: ipfs: enabled: false - gateway: https://ipfs.io \ No newline at end of file + gateway: https://ipfs.io + +s3: + enabled: false + accessKey: ABC12342069 + secret: lolomgqwtf49583949 + region: us-west-2 + bucket: myvideo \ No newline at end of file diff --git a/ffmpeg.go b/ffmpeg.go index ab55ef132..d178ce993 100644 --- a/ffmpeg.go +++ b/ffmpeg.go @@ -13,7 +13,7 @@ func startFfmpeg(configuration Config) { var outputDir = configuration.PublicHLSPath var hlsPlaylistName = path.Join(configuration.PublicHLSPath, "stream.m3u8") - if configuration.IPFS.Enabled { + if configuration.IPFS.Enabled || configuration.S3.Enabled { outputDir = configuration.PrivateHLSPath hlsPlaylistName = path.Join(outputDir, "temp.m3u8") } diff --git a/go.mod b/go.mod index 68e9b7780..9c88b3fe0 100644 --- a/go.mod +++ b/go.mod @@ -3,6 +3,7 @@ module github.com/gabek/video-stream go 1.14 require ( + github.com/aws/aws-sdk-go v1.31.9 github.com/ipfs/go-ipfs v0.5.1 github.com/ipfs/go-ipfs-config v0.5.3 github.com/ipfs/go-ipfs-files v0.0.8 diff --git a/go.sum b/go.sum index 500759ef9..a16fdc01f 100644 --- a/go.sum +++ b/go.sum @@ -32,6 +32,8 @@ github.com/alexbrainman/goissue34681 v0.0.0-20191006012335-3fc7a47baff5 h1:iW0a5 github.com/alexbrainman/goissue34681 v0.0.0-20191006012335-3fc7a47baff5/go.mod h1:Y2QMoi1vgtOIfc+6DhrMOGkLoGzqSV2rKp4Sm+opsyA= github.com/anmitsu/go-shlex v0.0.0-20161002113705-648efa622239/go.mod h1:2FmKhYUyUczH0OGQWaF5ceTx0UBShxjsH6f8oGKYe2c= github.com/armon/consul-api v0.0.0-20180202201655-eb2c6b5be1b6/go.mod h1:grANhF5doyWs3UAsr3K4I6qtAmlQcZDesFNEHPZAzj8= +github.com/aws/aws-sdk-go v1.31.9 h1:n+b34ydVfgC30j0Qm69yaapmjejQPW2BoDBX7Uy/tLI= +github.com/aws/aws-sdk-go v1.31.9/go.mod h1:5zCpMtNQVjRREroY7sYe8lOMRSxkhG6MZveU8YkpAk0= github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q= github.com/beorn7/perks v1.0.0/go.mod h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+CedLV8= github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= @@ -124,6 +126,7 @@ github.com/go-kit/kit v0.8.0/go.mod h1:xBxKIO96dXMWWy0MnWVtmwkA9/13aqxPnvrjFYMA2 github.com/go-kit/kit v0.9.0/go.mod h1:xBxKIO96dXMWWy0MnWVtmwkA9/13aqxPnvrjFYMA2as= github.com/go-logfmt/logfmt v0.3.0/go.mod h1:Qt1PoO58o5twSAckw1HlFXLmHsOX5/0LbT9GBnD5lWE= github.com/go-logfmt/logfmt v0.4.0/go.mod h1:3RMwSq7FuexP4Kalkev3ejPJsZTpXXBr9+V4qmtdjCk= +github.com/go-sql-driver/mysql v1.5.0/go.mod h1:DCzpHaOWr8IXmIStZouvnhqoel9Qv2LBy8hT2VhHyBg= github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY= github.com/godbus/dbus/v5 v5.0.3/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA= github.com/gogo/protobuf v1.1.1/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ= @@ -400,6 +403,8 @@ github.com/jbenet/goprocess v0.1.4/go.mod h1:5yspPrukOVuOLORacaBi858NqyClJPQxYZl github.com/jellevandenhooff/dkim v0.0.0-20150330215556-f50fe3d243e1/go.mod h1:E0B/fFc00Y+Rasa88328GlI/XbtyysCtTHZS8h7IrBU= github.com/jessevdk/go-flags v0.0.0-20141203071132-1679536dcc89/go.mod h1:4FA24M0QyGHXBuZZK/XkWh8h0e1EYbRYJSGM75WSRxI= github.com/jessevdk/go-flags v1.4.0/go.mod h1:4FA24M0QyGHXBuZZK/XkWh8h0e1EYbRYJSGM75WSRxI= +github.com/jmespath/go-jmespath v0.3.0 h1:OS12ieG61fsCg5+qLJ+SsW9NicxNkg3b25OyT2yCeUc= +github.com/jmespath/go-jmespath v0.3.0/go.mod h1:9QtRXoHjLGCJ5IBSaohpXITPlowMeeYCZ7fLUTSywik= github.com/jrick/logrotate v1.0.0/go.mod h1:LNinyqDIJnpAur+b8yyulnQw/wDuN1+BYKlTRt3OuAQ= github.com/json-iterator/go v1.1.6/go.mod h1:+SdeFBvtyEkXs7REEP0seUULqWtbJapLOCVDaaPEHmU= github.com/json-iterator/go v1.1.9/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4= @@ -1045,6 +1050,7 @@ golang.org/x/net v0.0.0-20190613194153-d28f0bde5980/go.mod h1:z5CRVTTTmAJ677TzLL golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20190724013045-ca1201d0de80/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20190923162816-aa69164e4478/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/net v0.0.0-20200202094626-16171245cfb2/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20200324143707-d3edc9973b7e/go.mod h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A= golang.org/x/net v0.0.0-20200520182314-0ba52f642ac2 h1:eDrdRpKgkcCqKZQwyZRyeFZgfqt37SL7Kv3tok06cKE= golang.org/x/net v0.0.0-20200520182314-0ba52f642ac2/go.mod h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A= diff --git a/ipfs.go b/ipfs.go deleted file mode 100644 index 97cf02edc..000000000 --- a/ipfs.go +++ /dev/null @@ -1,287 +0,0 @@ -package main - -import ( - "context" - "fmt" - "io/ioutil" - "os" - "path/filepath" - "strings" - "sync" - - log "github.com/sirupsen/logrus" - - config "github.com/ipfs/go-ipfs-config" - files "github.com/ipfs/go-ipfs-files" - icore "github.com/ipfs/interface-go-ipfs-core" - "github.com/ipfs/interface-go-ipfs-core/options" - "github.com/ipfs/interface-go-ipfs-core/path" - peer "github.com/libp2p/go-libp2p-peer" - peerstore "github.com/libp2p/go-libp2p-peerstore" - ma "github.com/multiformats/go-multiaddr" - - "github.com/ipfs/go-ipfs/core" - "github.com/ipfs/go-ipfs/core/coreapi" - "github.com/ipfs/go-ipfs/core/corehttp" - "github.com/ipfs/go-ipfs/core/node/libp2p" - "github.com/ipfs/go-ipfs/plugin/loader" - "github.com/ipfs/go-ipfs/repo/fsrepo" -) - -type IPFSStorage struct { - ipfs *icore.CoreAPI - node *core.IpfsNode - - ctx context.Context - directoryHash string - gateway string -} - -func (s *IPFSStorage) Setup(config Config) { - s.gateway = config.IPFS.Gateway - - s.ctx = context.Background() - - ipfsInstance, node, _ := s.createIPFSInstance() - s.ipfs = ipfsInstance - s.node = node - - s.createIPFSDirectory("./hls") -} - -func (s *IPFSStorage) Save(filePath string) string { - someFile, err := getUnixfsNode(filePath) - defer someFile.Close() - - if err != nil { - panic(fmt.Errorf("Could not get File: %s", err)) - } - - opts := []options.UnixfsAddOption{ - options.Unixfs.Pin(false), - // options.Unixfs.CidVersion(1), - // options.Unixfs.RawLeaves(false), - // options.Unixfs.Nocopy(false), - } - - cidFile, err := (*s.ipfs).Unixfs().Add(s.ctx, someFile, opts...) - - if err != nil { - panic(fmt.Errorf("Could not add File: %s", err)) - } - - // fmt.Printf("Added file to IPFS with CID %s\n", cidFile.String()) - - newHash := s.addFileToDirectory(cidFile, filepath.Base(filePath)) - - return newHash -} - -func (s *IPFSStorage) GenerateRemotePlaylist(playlist string, segments map[string]string) string { - for local, remote := range segments { - playlist = strings.ReplaceAll(playlist, local, s.gateway+remote) - } - return playlist -} - -func setupPlugins(externalPluginsPath string) error { - // Load any external plugins if available on externalPluginsPath - plugins, err := loader.NewPluginLoader(filepath.Join(externalPluginsPath, "plugins")) - if err != nil { - return fmt.Errorf("error loading plugins: %s", err) - } - - // Load preloaded and external plugins - if err := plugins.Initialize(); err != nil { - return fmt.Errorf("error initializing plugins: %s", err) - } - - if err := plugins.Inject(); err != nil { - return fmt.Errorf("error initializing plugins: %s", err) - } - - return nil -} - -// Creates an IPFS node and returns its coreAPI -func createNode(ctx context.Context, repoPath string) (icore.CoreAPI, *core.IpfsNode, error) { - // Open the repo - repo, err := fsrepo.Open(repoPath) - verifyError(err) - - if err != nil { - return nil, nil, err - } - - // Construct the node - - nodeOptions := &core.BuildCfg{ - Online: true, - Routing: libp2p.DHTOption, // This option sets the node to be a full DHT node (both fetching and storing DHT Records) - // Routing: libp2p.DHTClientOption, // This option sets the node to be a client DHT node (only fetching records) - Repo: repo, - } - - node, err := core.NewNode(ctx, nodeOptions) - node.IsDaemon = true - - if err != nil { - return nil, nil, err - } - - // Attach the Core API to the constructed node - coreAPI, err := coreapi.NewCoreAPI(node) - if err != nil { - return nil, nil, err - } - return coreAPI, node, nil -} - -func createTempRepo(ctx context.Context) (string, error) { - repoPath, err := ioutil.TempDir("", "ipfs-shell") - if err != nil { - return "", fmt.Errorf("failed to get temp dir: %s", err) - } - - // Create a config with default options and a 2048 bit key - cfg, err := config.Init(ioutil.Discard, 2048) - - if err != nil { - return "", err - } - - // Create the repo with the config - err = fsrepo.Init(repoPath, cfg) - if err != nil { - return "", fmt.Errorf("failed to init ephemeral node: %s", err) - } - - return repoPath, nil -} - -// Spawns a node to be used just for this run (i.e. creates a tmp repo) -func spawnEphemeral(ctx context.Context) (icore.CoreAPI, *core.IpfsNode, error) { - if err := setupPlugins(""); err != nil { - return nil, nil, err - } - - // Create a Temporary Repo - repoPath, err := createTempRepo(ctx) - if err != nil { - return nil, nil, fmt.Errorf("failed to create temp repo: %s", err) - } - - // Spawning an ephemeral IPFS node - coreAPI, node, err := createNode(ctx, repoPath) - return coreAPI, node, err -} - -func connectToPeers(ctx context.Context, ipfs icore.CoreAPI, peers []string) error { - var wg sync.WaitGroup - peerInfos := make(map[peer.ID]*peerstore.PeerInfo, len(peers)) - for _, addrStr := range peers { - addr, err := ma.NewMultiaddr(addrStr) - if err != nil { - return err - } - pii, err := peerstore.InfoFromP2pAddr(addr) - if err != nil { - return err - } - pi, ok := peerInfos[pii.ID] - if !ok { - pi = &peerstore.PeerInfo{ID: pii.ID} - peerInfos[pi.ID] = pi - } - pi.Addrs = append(pi.Addrs, pii.Addrs...) - } - - wg.Add(len(peerInfos)) - for _, peerInfo := range peerInfos { - go func(peerInfo *peerstore.PeerInfo) { - defer wg.Done() - err := ipfs.Swarm().Connect(ctx, *peerInfo) - if err != nil { - log.Printf("failed to connect to %s: %s", peerInfo.ID, err) - } - }(peerInfo) - } - wg.Wait() - return nil -} - -func getUnixfsNode(path string) (files.Node, error) { - st, err := os.Stat(path) - if err != nil { - return nil, err - } - - f, err := files.NewSerialFile(path, false, st) - - if err != nil { - return nil, err - } - - return f, nil -} - -func (s *IPFSStorage) addFileToDirectory(originalFileHashToModifyPath path.Path, filename string) string { - // fmt.Println("directoryToAddTo: "+s.directoryHash, "filename: "+filename, "originalFileHashToModifyPath: "+originalFileHashToModifyPath.String()) - directoryToAddToPath := path.New(s.directoryHash) - newDirectoryHash, err := (*s.ipfs).Object().AddLink(s.ctx, directoryToAddToPath, filename, originalFileHashToModifyPath) - - verifyError(err) - return newDirectoryHash.String() + "/" + filename -} - -func (s *IPFSStorage) createIPFSInstance() (*icore.CoreAPI, *core.IpfsNode, error) { - // Spawn a node using a temporary path, creating a temporary repo for the run - api, node, error := spawnEphemeral(s.ctx) - // api, node, error := spawnDefault(ctx) - return &api, node, error -} - -func (s *IPFSStorage) startIPFSNode() { //} icore.CoreAPI { - defer log.Println("IPFS node exited") - - log.Println("IPFS node is running") - - bootstrapNodes := []string{ - // IPFS Bootstrapper nodes. - "/dnsaddr/bootstrap.libp2p.io/p2p/QmNnooDu7bfjPFoTZYxMNLWUQJyrVwtbZg5gBMjTezGAJN", - "/dnsaddr/bootstrap.libp2p.io/p2p/QmQCU2EcMqAqQPR2i9bChDtGNJchTbq5TbXJJ16u19uLTa", - "/dnsaddr/bootstrap.libp2p.io/p2p/QmbLHAnMoJPWSCR5Zhtx6BHJX9KiKNN6tpvbUcqanj75Nb", - "/dnsaddr/bootstrap.libp2p.io/p2p/QmcZf59bWwK5XFi76CZX8cbJ4BhTzzA3gU1ZjYZcYW3dwt", - - // IPFS Cluster Pinning nodes - "/ip4/138.201.67.219/tcp/4001/p2p/QmUd6zHcbkbcs7SMxwLs48qZVX3vpcM8errYS7xEczwRMA", - - "/ip4/104.131.131.82/tcp/4001/p2p/QmaCpDMGvV2BGHeYERUEnRQAwe3N8SzbUtfsmvsqQLuvuJ", // mars.i.ipfs.io - "/ip4/104.131.131.82/udp/4001/quic/p2p/QmaCpDMGvV2BGHeYERUEnRQAwe3N8SzbUtfsmvsqQLuvuJ", // mars.i.ipfs.io - - // You can add more nodes here, for example, another IPFS node you might have running locally, mine was: - // "/ip4/127.0.0.1/tcp/4010/p2p/QmZp2fhDLxjYue2RiUvLwT9MWdnbDxam32qYFnGmxZDh5L", - // "/ip4/127.0.0.1/udp/4010/quic/p2p/QmZp2fhDLxjYue2RiUvLwT9MWdnbDxam32qYFnGmxZDh5L", - } - - go connectToPeers(s.ctx, *s.ipfs, bootstrapNodes) - - addr := "/ip4/127.0.0.1/tcp/5001" - var opts = []corehttp.ServeOption{ - corehttp.GatewayOption(true, "/ipfs", "/ipns"), - } - - if err := corehttp.ListenAndServe(s.node, addr, opts...); err != nil { - return - } -} - -func (s *IPFSStorage) createIPFSDirectory(directoryName string) { - directory, err := getUnixfsNode(directoryName) - verifyError(err) - defer directory.Close() - - newlyCreatedDirectoryHash, err := (*s.ipfs).Unixfs().Add(s.ctx, directory) - verifyError(err) - s.directoryHash = newlyCreatedDirectoryHash.String() -} diff --git a/main.go b/main.go index 3afa084d6..bd7504058 100644 --- a/main.go +++ b/main.go @@ -15,17 +15,24 @@ var server *Server var online = false func main() { - checkConfig(configuration) // resetDirectories() var hlsDirectoryPath = configuration.PublicHLSPath log.Println("Starting up. Please wait...") + var usingExternalStorage = false + if configuration.IPFS.Enabled { storage = &IPFSStorage{} - (storage).Setup(configuration) + usingExternalStorage = true + } else if configuration.S3.Enabled { + storage = &S3Storage{} + usingExternalStorage = true + } + if usingExternalStorage { + storage.Setup(configuration) hlsDirectoryPath = configuration.PrivateHLSPath go monitorVideoContent(hlsDirectoryPath, configuration, storage) } diff --git a/playlistMonitor.go b/playlistMonitor.go index 3061c778f..f883531ec 100644 --- a/playlistMonitor.go +++ b/playlistMonitor.go @@ -14,7 +14,7 @@ import ( var filesToUpload = make(map[string]string) func monitorVideoContent(pathToMonitor string, configuration Config, storage ChunkStorage) { - log.Printf("Using %s for IPFS files...\n", pathToMonitor) + log.Printf("Using %s files...\n", pathToMonitor) w := watcher.New() @@ -27,30 +27,24 @@ func monitorVideoContent(pathToMonitor string, configuration Config, storage Chu } if filepath.Base(event.Path) == "temp.m3u8" { - if configuration.IPFS.Enabled { - for filePath, objectID := range filesToUpload { - if objectID != "" { - continue - } - - newObjectPath := storage.Save(path.Join(configuration.PrivateHLSPath, filePath)) - filesToUpload[filePath] = newObjectPath + for filePath, objectID := range filesToUpload { + if objectID != "" { + continue } + + newObjectPath := storage.Save(path.Join(configuration.PrivateHLSPath, filePath)) + filesToUpload[filePath] = newObjectPath } playlistBytes, err := ioutil.ReadFile(event.Path) verifyError(err) playlistString := string(playlistBytes) - if configuration.IPFS.Enabled { - playlistString = storage.GenerateRemotePlaylist(playlistString, filesToUpload) - } + playlistString = storage.GenerateRemotePlaylist(playlistString, filesToUpload) writePlaylist(playlistString, path.Join(configuration.PublicHLSPath, "/stream.m3u8")) } else if filepath.Ext(event.Path) == ".ts" { - if configuration.IPFS.Enabled { - filesToUpload[filepath.Base(event.Path)] = "" - } + filesToUpload[filepath.Base(event.Path)] = "" } case err := <-w.Error: log.Fatalln(err) diff --git a/s3Storage.go b/s3Storage.go new file mode 100644 index 000000000..fffae537b --- /dev/null +++ b/s3Storage.go @@ -0,0 +1,115 @@ +package main + +import ( + "bufio" + "fmt" + "net/url" + "os" + "path" + "strings" + + log "github.com/sirupsen/logrus" + + "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/aws/credentials" + "github.com/aws/aws-sdk-go/aws/session" + "github.com/aws/aws-sdk-go/service/s3/s3manager" +) + +type S3Storage struct { + sess *session.Session + host string + + s3Region string + s3Bucket string + s3AccessKey string + s3Secret string +} + +func (s *S3Storage) Setup(configuration Config) { + log.Println("Setting up S3 for external storage of video...") + + s.s3Region = configuration.S3.Region + s.s3Bucket = configuration.S3.Bucket + s.s3AccessKey = configuration.S3.AccessKey + s.s3Secret = configuration.S3.Secret + + s.sess = s.connectAWS() +} + +func (s *S3Storage) Save(filePath string) string { + // fmt.Println("Saving", filePath) + + file, err := os.Open(filePath) // For read access. + if err != nil { + log.Fatal(err) + } + + uploader := s3manager.NewUploader(s.sess) + + response, err := uploader.Upload(&s3manager.UploadInput{ + Bucket: aws.String(s.s3Bucket), // Bucket to be used + Key: aws.String(filePath), // Name of the file to be saved + Body: file, // File + }) + + if err != nil { + panic(err) + } + + if s.host == "" { + // Take note of the root host location so we can regenerate full + // URLs to these files later when building the playlist in GenerateRemotePlaylist. + url, err := url.Parse(response.Location) + if err != nil { + fmt.Println(err) + } + + pathComponents := strings.Split(url.Path, "/") + pathComponents[len(pathComponents)-1] = "" + + s.host = fmt.Sprintf("%s://%s/%s", url.Scheme, url.Host, strings.Join(pathComponents, "/")) + } + + // fmt.Println("Uploaded", filePath, "to", response.Location) + + return filePath +} + +func (s *S3Storage) GenerateRemotePlaylist(playlist string, segments map[string]string) string { + var newPlaylist = "" + + scanner := bufio.NewScanner(strings.NewReader(playlist)) + for scanner.Scan() { + line := scanner.Text() + if line[0:1] != "#" { + line = path.Join(s.host, line) + } + + newPlaylist = newPlaylist + line + "\n" + } + + // fmt.Println(newPlaylist) + + return newPlaylist +} + +func (s S3Storage) connectAWS() *session.Session { + creds := credentials.NewStaticCredentials(s.s3AccessKey, s.s3Secret, "") + _, err := creds.Get() + if err != nil { + panic(err) + } + + sess, err := session.NewSession( + &aws.Config{ + Region: aws.String(s.s3Region), + Credentials: creds, + }, + ) + + if err != nil { + panic(err) + } + return sess +}