Update segment cleanup to support object storage (#2876)
* Add support for S3 cleanup + standardize firing cleanup. Closes #2646 * fix: manually fix post-merge
This commit is contained in:
@@ -6,16 +6,19 @@ import (
|
||||
"os"
|
||||
"path"
|
||||
"path/filepath"
|
||||
"sort"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/owncast/owncast/core/data"
|
||||
"github.com/owncast/owncast/utils"
|
||||
"github.com/pkg/errors"
|
||||
log "github.com/sirupsen/logrus"
|
||||
|
||||
"github.com/aws/aws-sdk-go/aws"
|
||||
"github.com/aws/aws-sdk-go/aws/credentials"
|
||||
"github.com/aws/aws-sdk-go/aws/session"
|
||||
"github.com/aws/aws-sdk-go/service/s3"
|
||||
"github.com/aws/aws-sdk-go/service/s3/s3manager"
|
||||
|
||||
"github.com/owncast/owncast/config"
|
||||
@@ -23,13 +26,8 @@ import (
|
||||
|
||||
// S3Storage is the s3 implementation of a storage provider.
|
||||
type S3Storage struct {
|
||||
sess *session.Session
|
||||
|
||||
// If we try to upload a playlist but it is not yet on disk
|
||||
// then keep a reference to it here.
|
||||
queuedPlaylistUpdates map[string]string
|
||||
|
||||
uploader *s3manager.Uploader
|
||||
sess *session.Session
|
||||
s3Client *s3.S3
|
||||
host string
|
||||
|
||||
s3Endpoint string
|
||||
@@ -40,6 +38,12 @@ type S3Storage struct {
|
||||
s3Secret string
|
||||
s3ACL string
|
||||
s3ForcePathStyle bool
|
||||
|
||||
// If we try to upload a playlist but it is not yet on disk
|
||||
// then keep a reference to it here.
|
||||
queuedPlaylistUpdates map[string]string
|
||||
|
||||
uploader *s3manager.Uploader
|
||||
}
|
||||
|
||||
// NewS3Storage returns a new S3Storage instance.
|
||||
@@ -55,6 +59,7 @@ func (s *S3Storage) Setup() error {
|
||||
|
||||
s3Config := data.GetS3Config()
|
||||
customVideoServingEndpoint := data.GetVideoServingEndpoint()
|
||||
|
||||
if customVideoServingEndpoint != "" {
|
||||
s.host = customVideoServingEndpoint
|
||||
} else {
|
||||
@@ -71,6 +76,7 @@ func (s *S3Storage) Setup() error {
|
||||
s.s3ForcePathStyle = s3Config.ForcePathStyle
|
||||
|
||||
s.sess = s.connectAWS()
|
||||
s.s3Client = s3.New(s.sess)
|
||||
|
||||
s.uploader = s3manager.NewUploader(s.sess)
|
||||
|
||||
@@ -184,6 +190,21 @@ func (s *S3Storage) Save(filePath string, retryCount int) (string, error) {
|
||||
return response.Location, nil
|
||||
}
|
||||
|
||||
func (s *S3Storage) Cleanup() error {
|
||||
// Determine how many files we should keep on S3 storage
|
||||
maxNumber := data.GetStreamLatencyLevel().SegmentCount
|
||||
buffer := 20
|
||||
|
||||
keys, err := s.getDeletableVideoSegmentsWithOffset(maxNumber + buffer)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
s.deleteObjects(keys)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *S3Storage) connectAWS() *session.Session {
|
||||
t := http.DefaultTransport.(*http.Transport).Clone()
|
||||
t.MaxIdleConnsPerHost = 100
|
||||
@@ -213,3 +234,73 @@ func (s *S3Storage) connectAWS() *session.Session {
|
||||
}
|
||||
return sess
|
||||
}
|
||||
|
||||
func (s *S3Storage) getDeletableVideoSegmentsWithOffset(offset int) ([]s3object, error) {
|
||||
objectsToDelete, err := s.retrieveAllVideoSegments()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
objectsToDelete = objectsToDelete[offset : len(objectsToDelete)-1]
|
||||
|
||||
return objectsToDelete, nil
|
||||
}
|
||||
|
||||
func (s *S3Storage) deleteObjects(objects []s3object) {
|
||||
keys := make([]*s3.ObjectIdentifier, len(objects))
|
||||
for i, object := range objects {
|
||||
keys[i] = &s3.ObjectIdentifier{Key: aws.String(object.key)}
|
||||
}
|
||||
|
||||
log.Debugln("Deleting", len(keys), "objects from S3 bucket:", s.s3Bucket)
|
||||
|
||||
deleteObjectsRequest := &s3.DeleteObjectsInput{
|
||||
Bucket: aws.String(s.s3Bucket),
|
||||
Delete: &s3.Delete{
|
||||
Objects: keys,
|
||||
Quiet: aws.Bool(true),
|
||||
},
|
||||
}
|
||||
|
||||
_, err := s.s3Client.DeleteObjects(deleteObjectsRequest)
|
||||
if err != nil {
|
||||
log.Errorf("Unable to delete objects from bucket %q, %v\n", s.s3Bucket, err)
|
||||
}
|
||||
}
|
||||
|
||||
func (s *S3Storage) retrieveAllVideoSegments() ([]s3object, error) {
|
||||
allObjectsListRequest := &s3.ListObjectsInput{
|
||||
Bucket: aws.String(s.s3Bucket),
|
||||
}
|
||||
|
||||
// Fetch all objects in the bucket
|
||||
allObjectsListResponse, err := s.s3Client.ListObjects(allObjectsListRequest)
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "Unable to fetch list of items in bucket for cleanup")
|
||||
}
|
||||
|
||||
// Filter out non-video segments
|
||||
allObjects := []s3object{}
|
||||
for _, item := range allObjectsListResponse.Contents {
|
||||
if !strings.HasSuffix(*item.Key, ".ts") {
|
||||
continue
|
||||
}
|
||||
|
||||
allObjects = append(allObjects, s3object{
|
||||
key: *item.Key,
|
||||
lastModified: *item.LastModified,
|
||||
})
|
||||
}
|
||||
|
||||
// Sort the results by timestamp
|
||||
sort.Slice(allObjects, func(i, j int) bool {
|
||||
return allObjects[i].lastModified.After(allObjects[j].lastModified)
|
||||
})
|
||||
|
||||
return allObjects, nil
|
||||
}
|
||||
|
||||
type s3object struct {
|
||||
key string
|
||||
lastModified time.Time
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user