Remove IPFS. For #74
This commit is contained in:
parent
4ec076cf34
commit
b754ee01bf
12
README.md
12
README.md
@ -98,7 +98,6 @@ Three ways of storing and distributing the video are supported.
|
||||
|
||||
1. [Locally](#local-file-distribution) via the built-in web server.
|
||||
2. [S3-compatible storage](#s3-compatible-storage).
|
||||
3. Experimental [IPFS](#ipfs) support.
|
||||
|
||||
### Local file distribution
|
||||
|
||||
@ -110,17 +109,6 @@ Enable S3 support in `config.yaml` and add your access credentials. Files will b
|
||||
|
||||
Please read the [more detailed documentation about configuration of S3-Compatible Services](https://github.com/gabek/owncast/blob/master/doc/S3.md).
|
||||
|
||||
### IPFS
|
||||
|
||||
From the [IPFS website](https://ipfs.io/):
|
||||
|
||||
> Peer-to-peer IPFS saves big on bandwidth — up to 60% for video — making it possible to efficiently distribute high volumes of data without duplication.
|
||||
|
||||
Enable experimental IPFS support and your video will be distributed through the [IPFS network](https://ipfs.io/#how). In this scenario viewers will stream the video from IPFS nodes instead of the server running the service. This is free but **can be very slow**. It can also be just fine, you'll have to experiment for yourself. It can sometimes take too long for the network to get the video to the user, resulting in delays and heavy buffering. Try it if you like and make any suggestions on how to make it better so everyone can have free global video distribution without paying for a CDN or a 3rd party storage service.
|
||||
|
||||
By editing the config file you can change what IPFS gateway server is used, and you can experiment with [trying different ones](https://ipfs.github.io/public-gateway-checker/).
|
||||
|
||||
|
||||
## Building from Source
|
||||
|
||||
1. Ensure you have the gcc compiler configured.
|
||||
|
@ -19,7 +19,6 @@ type config struct {
|
||||
EnableDebugFeatures bool `yaml:"-"`
|
||||
FFMpegPath string `yaml:"ffmpegPath"`
|
||||
Files files `yaml:"files"`
|
||||
IPFS ipfs `yaml:"ipfs"`
|
||||
InstanceDetails InstanceDetails `yaml:"instanceDetails"`
|
||||
PrivateHLSPath string `yaml:"privateHLSPath"`
|
||||
PublicHLSPath string `yaml:"publicHLSPath"`
|
||||
@ -78,11 +77,6 @@ type files struct {
|
||||
MaxNumberInPlaylist int `yaml:"maxNumberInPlaylist"`
|
||||
}
|
||||
|
||||
type ipfs struct {
|
||||
Enabled bool `yaml:"enabled"`
|
||||
Gateway string `yaml:"gateway"`
|
||||
}
|
||||
|
||||
//s3 is for configuring the s3 integration
|
||||
type s3 struct {
|
||||
Enabled bool `yaml:"enabled"`
|
||||
@ -119,10 +113,6 @@ func (c *config) verifySettings() error {
|
||||
return errors.New("No stream key set. Please set one in your config file.")
|
||||
}
|
||||
|
||||
if c.S3.Enabled && c.IPFS.Enabled {
|
||||
return errors.New("s3 and IPFS support cannot be enabled at the same time; choose one")
|
||||
}
|
||||
|
||||
if c.S3.Enabled {
|
||||
if c.S3.AccessKey == "" || c.S3.Secret == "" {
|
||||
return errors.New("s3 support requires an access key and secret")
|
||||
|
@ -180,7 +180,7 @@ func NewTranscoder() Transcoder {
|
||||
transcoder.hlsPlaylistLength = config.Config.GetMaxNumberOfReferencedSegmentsInPlaylist()
|
||||
|
||||
var outputPath string
|
||||
if config.Config.S3.Enabled || config.Config.IPFS.Enabled {
|
||||
if config.Config.S3.Enabled {
|
||||
// Segments are not available via the local HTTP server
|
||||
outputPath = config.Config.GetPrivateHLSSavePath()
|
||||
} else {
|
||||
|
@ -11,10 +11,7 @@ var (
|
||||
)
|
||||
|
||||
func setupStorage() error {
|
||||
if config.Config.IPFS.Enabled {
|
||||
_storage = &storageproviders.IPFSStorage{}
|
||||
usingExternalStorage = true
|
||||
} else if config.Config.S3.Enabled {
|
||||
if config.Config.S3.Enabled {
|
||||
_storage = &storageproviders.S3Storage{}
|
||||
usingExternalStorage = true
|
||||
}
|
||||
|
@ -1,319 +0,0 @@
|
||||
package storageproviders
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
"context"
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"strings"
|
||||
"sync"
|
||||
|
||||
log "github.com/sirupsen/logrus"
|
||||
|
||||
config "github.com/ipfs/go-ipfs-config"
|
||||
files "github.com/ipfs/go-ipfs-files"
|
||||
icore "github.com/ipfs/interface-go-ipfs-core"
|
||||
"github.com/ipfs/interface-go-ipfs-core/options"
|
||||
"github.com/ipfs/interface-go-ipfs-core/path"
|
||||
peer "github.com/libp2p/go-libp2p-peer"
|
||||
peerstore "github.com/libp2p/go-libp2p-peerstore"
|
||||
ma "github.com/multiformats/go-multiaddr"
|
||||
|
||||
"github.com/ipfs/go-ipfs/core"
|
||||
"github.com/ipfs/go-ipfs/core/coreapi"
|
||||
"github.com/ipfs/go-ipfs/core/corehttp"
|
||||
"github.com/ipfs/go-ipfs/core/node/libp2p"
|
||||
"github.com/ipfs/go-ipfs/plugin/loader"
|
||||
"github.com/ipfs/go-ipfs/repo/fsrepo"
|
||||
|
||||
ownconfig "github.com/gabek/owncast/config"
|
||||
"github.com/gabek/owncast/models"
|
||||
)
|
||||
|
||||
//IPFSStorage is the ipfs implementation of the ChunkStorageProvider
|
||||
type IPFSStorage struct {
|
||||
ipfs *icore.CoreAPI
|
||||
node *core.IpfsNode
|
||||
|
||||
ctx context.Context
|
||||
directoryHash string
|
||||
gateway string
|
||||
}
|
||||
|
||||
//Setup sets up the ipfs storage for saving the video to ipfs
|
||||
func (s *IPFSStorage) Setup() error {
|
||||
log.Trace("Setting up IPFS for external storage of video. Please wait..")
|
||||
|
||||
s.gateway = ownconfig.Config.IPFS.Gateway
|
||||
|
||||
s.ctx = context.Background()
|
||||
|
||||
ipfsInstance, node, err := s.createIPFSInstance()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
s.ipfs = ipfsInstance
|
||||
s.node = node
|
||||
|
||||
return s.createIPFSDirectory("./hls")
|
||||
}
|
||||
|
||||
//Save saves the file to the ipfs storage
|
||||
func (s *IPFSStorage) Save(filePath string, retryCount int) (string, error) {
|
||||
someFile, err := getUnixfsNode(filePath)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
defer someFile.Close()
|
||||
|
||||
opts := []options.UnixfsAddOption{
|
||||
options.Unixfs.Pin(false),
|
||||
// options.Unixfs.CidVersion(1),
|
||||
// options.Unixfs.RawLeaves(false),
|
||||
// options.Unixfs.Nocopy(false),
|
||||
}
|
||||
|
||||
cidFile, err := (*s.ipfs).Unixfs().Add(s.ctx, someFile, opts...)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
||||
// fmt.Printf("Added file to IPFS with CID %s\n", cidFile.String())
|
||||
|
||||
newHash, err := s.addFileToDirectory(cidFile, filepath.Base(filePath))
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
||||
return s.gateway + newHash, nil
|
||||
}
|
||||
|
||||
//GenerateRemotePlaylist implements the 'GenerateRemotePlaylist' method
|
||||
func (s *IPFSStorage) GenerateRemotePlaylist(playlist string, variant models.Variant) string {
|
||||
var newPlaylist = ""
|
||||
|
||||
scanner := bufio.NewScanner(strings.NewReader(playlist))
|
||||
for scanner.Scan() {
|
||||
line := scanner.Text()
|
||||
if line[0:1] != "#" {
|
||||
fullRemotePath := variant.GetSegmentForFilename(line)
|
||||
if fullRemotePath == nil {
|
||||
line = ""
|
||||
} else {
|
||||
line = fullRemotePath.RemoteID
|
||||
}
|
||||
}
|
||||
|
||||
newPlaylist = newPlaylist + line + "\n"
|
||||
}
|
||||
|
||||
return newPlaylist
|
||||
}
|
||||
|
||||
func setupPlugins(externalPluginsPath string) error {
|
||||
// Load any external plugins if available on externalPluginsPath
|
||||
plugins, err := loader.NewPluginLoader(filepath.Join(externalPluginsPath, "plugins"))
|
||||
if err != nil {
|
||||
return fmt.Errorf("error loading plugins: %s", err)
|
||||
}
|
||||
|
||||
// Load preloaded and external plugins
|
||||
if err := plugins.Initialize(); err != nil {
|
||||
return fmt.Errorf("error initializing plugins: %s", err)
|
||||
}
|
||||
|
||||
if err := plugins.Inject(); err != nil {
|
||||
return fmt.Errorf("error initializing plugins: %s", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// Creates an IPFS node and returns its coreAPI
|
||||
func createNode(ctx context.Context, repoPath string) (icore.CoreAPI, *core.IpfsNode, error) {
|
||||
// Open the repo
|
||||
repo, err := fsrepo.Open(repoPath)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
// Construct the node
|
||||
|
||||
nodeOptions := &core.BuildCfg{
|
||||
Online: true,
|
||||
Routing: libp2p.DHTOption, // This option sets the node to be a full DHT node (both fetching and storing DHT Records)
|
||||
// Routing: libp2p.DHTClientOption, // This option sets the node to be a client DHT node (only fetching records)
|
||||
Repo: repo,
|
||||
}
|
||||
|
||||
node, err := core.NewNode(ctx, nodeOptions)
|
||||
node.IsDaemon = true
|
||||
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
// Attach the Core API to the constructed node
|
||||
coreAPI, err := coreapi.NewCoreAPI(node)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
return coreAPI, node, nil
|
||||
}
|
||||
|
||||
func createTempRepo(ctx context.Context) (string, error) {
|
||||
repoPath, err := ioutil.TempDir("", "ipfs-shell")
|
||||
if err != nil {
|
||||
return "", fmt.Errorf("failed to get temp dir: %s", err)
|
||||
}
|
||||
|
||||
// Create a config with default options and a 2048 bit key
|
||||
cfg, err := config.Init(ioutil.Discard, 2048)
|
||||
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
||||
// Create the repo with the config
|
||||
err = fsrepo.Init(repoPath, cfg)
|
||||
if err != nil {
|
||||
return "", fmt.Errorf("failed to init ephemeral node: %s", err)
|
||||
}
|
||||
|
||||
return repoPath, nil
|
||||
}
|
||||
|
||||
// Spawns a node to be used just for this run (i.e. creates a tmp repo)
|
||||
func spawnEphemeral(ctx context.Context) (icore.CoreAPI, *core.IpfsNode, error) {
|
||||
if err := setupPlugins(""); err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
// Create a Temporary Repo
|
||||
repoPath, err := createTempRepo(ctx)
|
||||
if err != nil {
|
||||
return nil, nil, fmt.Errorf("failed to create temp repo: %s", err)
|
||||
}
|
||||
|
||||
// Spawning an ephemeral IPFS node
|
||||
return createNode(ctx, repoPath)
|
||||
}
|
||||
|
||||
func connectToPeers(ctx context.Context, ipfs icore.CoreAPI, peers []string) error {
|
||||
var wg sync.WaitGroup
|
||||
peerInfos := make(map[peer.ID]*peerstore.PeerInfo, len(peers))
|
||||
for _, addrStr := range peers {
|
||||
addr, err := ma.NewMultiaddr(addrStr)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
pii, err := peerstore.InfoFromP2pAddr(addr)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
pi, ok := peerInfos[pii.ID]
|
||||
if !ok {
|
||||
pi = &peerstore.PeerInfo{ID: pii.ID}
|
||||
peerInfos[pi.ID] = pi
|
||||
}
|
||||
pi.Addrs = append(pi.Addrs, pii.Addrs...)
|
||||
}
|
||||
|
||||
wg.Add(len(peerInfos))
|
||||
for _, peerInfo := range peerInfos {
|
||||
go func(peerInfo *peerstore.PeerInfo) {
|
||||
defer wg.Done()
|
||||
err := ipfs.Swarm().Connect(ctx, *peerInfo)
|
||||
if err != nil {
|
||||
log.Printf("failed to connect to %s: %s", peerInfo.ID, err)
|
||||
}
|
||||
}(peerInfo)
|
||||
}
|
||||
wg.Wait()
|
||||
return nil
|
||||
}
|
||||
|
||||
func getUnixfsNode(path string) (files.Node, error) {
|
||||
st, err := os.Stat(path)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
f, err := files.NewSerialFile(path, false, st)
|
||||
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return f, nil
|
||||
}
|
||||
|
||||
func (s *IPFSStorage) addFileToDirectory(originalFileHashToModifyPath path.Path, filename string) (string, error) {
|
||||
// fmt.Println("directoryToAddTo: "+s.directoryHash, "filename: "+filename, "originalFileHashToModifyPath: "+originalFileHashToModifyPath.String())
|
||||
directoryToAddToPath := path.New(s.directoryHash)
|
||||
newDirectoryHash, err := (*s.ipfs).Object().AddLink(s.ctx, directoryToAddToPath, filename, originalFileHashToModifyPath)
|
||||
|
||||
return newDirectoryHash.String() + "/" + filename, err
|
||||
}
|
||||
|
||||
func (s *IPFSStorage) createIPFSInstance() (*icore.CoreAPI, *core.IpfsNode, error) {
|
||||
// Spawn a node using a temporary path, creating a temporary repo for the run
|
||||
api, node, error := spawnEphemeral(s.ctx)
|
||||
// api, node, error := spawnDefault(ctx)
|
||||
return &api, node, error
|
||||
}
|
||||
|
||||
func (s *IPFSStorage) startIPFSNode() { //} icore.CoreAPI {
|
||||
defer log.Debug("IPFS node exited")
|
||||
|
||||
log.Trace("IPFS node is running")
|
||||
|
||||
bootstrapNodes := []string{
|
||||
// IPFS Bootstrapper nodes.
|
||||
"/dnsaddr/bootstrap.libp2p.io/p2p/QmNnooDu7bfjPFoTZYxMNLWUQJyrVwtbZg5gBMjTezGAJN",
|
||||
"/dnsaddr/bootstrap.libp2p.io/p2p/QmQCU2EcMqAqQPR2i9bChDtGNJchTbq5TbXJJ16u19uLTa",
|
||||
"/dnsaddr/bootstrap.libp2p.io/p2p/QmbLHAnMoJPWSCR5Zhtx6BHJX9KiKNN6tpvbUcqanj75Nb",
|
||||
"/dnsaddr/bootstrap.libp2p.io/p2p/QmcZf59bWwK5XFi76CZX8cbJ4BhTzzA3gU1ZjYZcYW3dwt",
|
||||
|
||||
// IPFS Cluster Pinning nodes
|
||||
"/ip4/138.201.67.219/tcp/4001/p2p/QmUd6zHcbkbcs7SMxwLs48qZVX3vpcM8errYS7xEczwRMA",
|
||||
|
||||
"/ip4/104.131.131.82/tcp/4001/p2p/QmaCpDMGvV2BGHeYERUEnRQAwe3N8SzbUtfsmvsqQLuvuJ", // mars.i.ipfs.io
|
||||
"/ip4/104.131.131.82/udp/4001/quic/p2p/QmaCpDMGvV2BGHeYERUEnRQAwe3N8SzbUtfsmvsqQLuvuJ", // mars.i.ipfs.io
|
||||
|
||||
// You can add more nodes here, for example, another IPFS node you might have running locally, mine was:
|
||||
// "/ip4/127.0.0.1/tcp/4010/p2p/QmZp2fhDLxjYue2RiUvLwT9MWdnbDxam32qYFnGmxZDh5L",
|
||||
// "/ip4/127.0.0.1/udp/4010/quic/p2p/QmZp2fhDLxjYue2RiUvLwT9MWdnbDxam32qYFnGmxZDh5L",
|
||||
}
|
||||
|
||||
go connectToPeers(s.ctx, *s.ipfs, bootstrapNodes)
|
||||
|
||||
addr := "/ip4/127.0.0.1/tcp/5001"
|
||||
var opts = []corehttp.ServeOption{
|
||||
corehttp.GatewayOption(true, "/ipfs", "/ipns"),
|
||||
}
|
||||
|
||||
if err := corehttp.ListenAndServe(s.node, addr, opts...); err != nil {
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
func (s *IPFSStorage) createIPFSDirectory(directoryName string) error {
|
||||
directory, err := getUnixfsNode(directoryName)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer directory.Close()
|
||||
|
||||
newlyCreatedDirectoryHash, err := (*s.ipfs).Unixfs().Add(s.ctx, directory)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
s.directoryHash = newlyCreatedDirectoryHash.String()
|
||||
|
||||
return nil
|
||||
}
|
@ -74,9 +74,3 @@ s3:
|
||||
secret: lolomgqwtf49583949
|
||||
region: us-west-2
|
||||
bucket: myvideo
|
||||
|
||||
# Experimental IPFS support to store segments in the IPFS network.
|
||||
# https://ipfs.io/#how.
|
||||
ipfs:
|
||||
enabled: false
|
||||
gateway: https://ipfs.io
|
||||
|
11
go.mod
11
go.mod
@ -4,20 +4,17 @@ go 1.14
|
||||
|
||||
require (
|
||||
github.com/aws/aws-sdk-go v1.32.1
|
||||
github.com/ipfs/go-ipfs v0.5.1
|
||||
github.com/ipfs/go-ipfs-config v0.5.3
|
||||
github.com/ipfs/go-ipfs-files v0.0.8
|
||||
github.com/ipfs/interface-go-ipfs-core v0.2.7
|
||||
github.com/libp2p/go-libp2p-peer v0.2.0
|
||||
github.com/libp2p/go-libp2p-peerstore v0.2.6
|
||||
github.com/kr/pretty v0.2.0 // indirect
|
||||
github.com/mattn/go-sqlite3 v1.14.0
|
||||
github.com/mssola/user_agent v0.5.2
|
||||
github.com/multiformats/go-multiaddr v0.2.2
|
||||
github.com/nareix/joy5 v0.0.0-20200710135721-d57196c8d506
|
||||
github.com/radovskyb/watcher v1.0.7
|
||||
github.com/sirupsen/logrus v1.6.0
|
||||
github.com/teris-io/shortid v0.0.0-20171029131806-771a37caa5cf
|
||||
golang.org/x/net v0.0.0-20200602114024-627f9648deb9
|
||||
golang.org/x/sys v0.0.0-20200420163511-1957bb5e6d1f // indirect
|
||||
golang.org/x/text v0.3.2 // indirect
|
||||
gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 // indirect
|
||||
gopkg.in/yaml.v2 v2.3.0
|
||||
)
|
||||
|
||||
|
@ -5,7 +5,7 @@ type Segment struct {
|
||||
VariantIndex int // The bitrate variant
|
||||
FullDiskPath string // Where it lives on disk
|
||||
RelativeUploadPath string // Path it should have remotely
|
||||
RemoteID string // Used for IPFS
|
||||
RemoteID string
|
||||
}
|
||||
|
||||
//Variant represents a single bitrate variant and the segments that make it up
|
||||
|
Loading…
x
Reference in New Issue
Block a user