Social features / ActivityPub federation (#1629)

* Support webfinger requests for the live account. Closes https://github.com/owncast/owncast/issues/1193

* Support for actor requests. Returns response for live actor. Closes https://github.com/owncast/owncast/issues/1203

* Handle follow and unfollow requests. Closes
https://github.com/owncast/owncast/issues/1191 and https://github.com/owncast/owncast/issues/1205 and https://github.com/owncast/owncast/issues/1206 and https://github.com/owncast/owncast/issues/1194

* Add basic support for sending out text activities. For https://github.com/owncast/owncast/issues/1192

* Some error handling and passing of dynamic local account names.

* Add hardcoded example image attachment to test post

* Centralize the map of accounts and inboxes

* No longer disable the preview generator based on YP toggle

* Send a federated message to followers when stream starts. For https://github.com/owncast/owncast/issues/1192

* Placeholder for attaching tags

* Add image description

* Save and get to outbox persistence. Return using outbox endpoint for actor

* Pass payloads to be handled through the gochan

* Handle undo follow requests explitly, not all undo requests

* Add API for manually sending simple federated messages. Closes #1215

* Verify inbox requests. Closes #1321

* Add route to fetch a single AP object by ID. For #1329

* Add responses to fediverse nodeinfo requests

* Set and get federation config values for admin

* Handle host-meta requests

* Do not send out message if disabled. Use saved go live message.

* Require AP-compatible content types for AP-related requests

* Rename ap models to apmodels for clarity

* Change how content type matching takes place.

* io -> ioutil

* Add stub delete activity callback

* Handle likes and announces to surface engagement in chat. Part of #1229

* Append url to go live posts

* Do not require specific content types for nodeinfo requests

* Add follow engagement chat message via AP

* add owncast user-agent to requests

* Set note visibility to public (for now)

* Fix saving/fetching a single object

* Add support for x-nodeinfo2 responses

* Point to the dev admin branch for ap

* Bundle in dev admin for testing

* Add error logging

* Add AP middleware back

* Point to the new external compatible logo endpoint

* Clean up more AP logging to help testing

* Tweak go live text and link hashtags

* Fix bug in fetching init time

* Send update actor activities when server details/profile is updated

* Add federation config overview to web client config

* Add additional actor properties

* Make the AP middleware checking more flexible when looking at types

* First pass at remote fediverse follow flow. For #1371

* Added a basic AP actor followers endpoint

* WIP client followers API

* Add profile-page reference to webfinger response

* Add aliases to webfinger response

* Fix content-type returned to be expected activitypub+json

* First pass at followers api

* Point at local dev copy of go-fed/activity

* Add custom toot Hashtag objects to posts

* Store additional user details to followers table

* Fix AP followers endpoint. Closes #1204

* Add owncast hashtag as an invisible tag to go live posts

* Reject AP requests when it is disabled

* Add actor util for generating full account user from person object

* Verify inbox requests before performing any other work

* Accept actor update requests

* Fix linter errors in federation branch

* Migrate AP SQL to sqlc for type safe queries

* Use the @unclearParadigm REST parameter helper

* Fix verifying post ID on AP engagement

* WIP privacy/request approval

* Style the remote follow modal

* First pass at a followers list component w/ mock data. #1370

* Revert "Use the @unclearParadigm REST parameter helper"

This reverts commit c8af8a413f6f53e7d1a15a7d823ff28be2db3c23.

* Fix get followers API

* Add support for requiring approval. Closes https://github.com/owncast/owncast/issues/1208

* Handle Applications as Actors partly for PeerTube support

* add temp todo list

* check route on load, this might change later

* style followers

* account for just 1 tab case

* Remove mock data. Allow showing follow button even when there are no external actions defined

* Point to actual followers API

* Support fallback img for follower views

* Remove duplicate verification. Add some additional verbose logging

* Bundle dev admin

* Add type to host-meta webfinger template response

* Tweak remote follow modal content

* WIP federation followers refactor

* Do not send pointer to middleware

* Update admin

* Add setting for toggling displaying fediverse engagement. Closes #1404

* Add in-development admin

* Do not enable cors on admin followers api

* Add db migration for updating messages table

* Enable empty string go live messages to disable

* Remove debug messages

* Rework some ActivityPub handling.

Create new Actor->Person handling.
Create new Actor->Service handling.
Add engagement handlers to send chat events and store event objects.
Store inbound activities to new ap_inbound_activities table.

* Support federated engagement events.

Store them in the messages table and surface them via chat events.

* Support federated event engatement in the chat

* Tweak web UI followers handling

* Point go.mod at remote fork instead of local

* Update admin

* Merged in develop. Couple fixes

* Update dev admin

* Update fedi engagement posts.

- Fix incorrect action text.
- Add action icons.

* Set public as to instead of cc for ap msg

* Updated styling for federated actions in chat

* Add support for blocking federated domains. Closes #1209

* Force checking of https in verify step

* Update dev admin

* Return user scopes in chat history api. Closes #1586

* Update dev admin

* Add AP outbound request worker pool. Closes #1571

* Disable (temporarily?) owncast tag on AP posts

* Consolidate creating activity+notes in outbound AP messages

* Add inbox worker pool. Closes #1570

* Update dev admin bundle

* Clean up some logs

* Re-enable inbound verfication

* Save full IRI to outbox instead of path

* Reject if full IRI is not found in outbox

* Use full ActivityPub user account in chat event

* Fix and expand follower APIs

- Add missing IDs to AP follower endpoints
- Split AP follower endpoints into initial request and pages.
- Support pagination in AP requests.

* Include IRI in error message

* Hide chat toggle when chat is hidden. Closes #1606

* Updates to followers pagination

* Set default go live message

* Remove log

* indirect -> direct import

* Updates for inbound federated event handling.

- Keep track of existing events and reject duplicates.
- Change what is sent to chat for surfing federated engagement.
- Keep track if outbound events are automated "go live" events or not.

* Update chat federated engagement.

* Update dev admin.

* Move from being a person to a bot (service). Closes #1619

* Only set server init date if not already set

* Only save notes to outbox able

* Rework private-mode followers/approvals

* API for returning a list of federated actions for #1573

* Fix too-small follower cells and jumpy tabs. Closes #1616 and closes #1516

* Fix shortcuts getting fired on inputs. Fixes #1489 and #1201

* Add spinner, autoclose + other fixes to follow modal. Fixes #1593

* Fix fetching a single object by IRI

* SendFederationMessage -> SendFederatedMessage

* Autolink and create tag objects from manual posts. Closes #1620

* Update dev admin bundle

* Handle engagement from non-automated/live posts

* Reject federated engagement actions if they do not match a local post

* Update dev admin bundle

* A bunch of cleanup

* Fix unused assignments and logic

* Remove unused function

* Add content warning and sentive content flag if stream is NSFW. Closes #1624

* Disable fetching objects by IRI when in private mode. Closes #1623

* Update the error message of the remote follow dialog. closes #1622

* Update dev admin

* Fix NREs throwing in test content

* Fix query that wasn't properly filtering out hidden messages

* Test against user being disabled instead of message visibility

* Fix automated test NRE

* Update comment

* Adjust federated engagement chat views. Closes #1617

* Add additional index to users table

* Add support for removing followers/requests. Closes #1630

* Reject federated actions from blocked actors. #1631

* Use fallback avatar if it fails to load. Closes #1635

* Fix styling of follower list. Closes #1636

* Add basic blurb stating they should follow the server. Closes #1641

* Update dev admin

* Set default go live message in migration. Closes #1642

* Reset the messages table on 0.0.11 schema migration

* Fix js error with moderation actions. Closes #1621

* Add a bit more clarification on follow modal. Closes #1599

* Remove todos

* Split out actor and domain blocking checks

* Check for errors on default values being set

* Clean up actor rejection due to being blocked

* Update dev admin

* Add colon to error to make it easier to read

* Remove markdown rendering of go live message. Reorganize text. Remove content warning. Closes #1645

* Break out the sort+render messages logic so it can be fired on visibility change. Closes #1643

* Do not send profile updates if federation is disabled

* Save follow references to inbound activities table

* Update dev admin

* Add blocked actor test

* Remove the overloaded term of Follow from social links

* Fix test running in memory only

* Remove "just" in engagement messags

* Replace star with heart for like action.

* Update dev admin

* Explicitly set cc as public

* Remove overly using the stream name in fediverse engagement messages

* Some federated/follow UI tweaks

* Remove explicit cc and bcc as they are not required

* Explicitly set the audience

* Remove extra margin

* Add Join Fediverse button to follow modal. Closes #1651

* Do not allow multiple follows to send multiple events. Closes #1650

* Give events a min height

* Do not allow old posts to be liked/shared. Closes #1652

* Remove value from log message

* Alert followers on private mode toggle

* Ignore clicks to follow button if disabled

* Remove underline from action buttons

* Add moderator icon to join message

* Update admin

* Post-merge remove unused var

* Remove pointing at feature branch

Co-authored-by: Ginger Wong <omqmail@gmail.com>
This commit is contained in:
Gabe Kangas
2022-01-12 13:53:10 -08:00
committed by GitHub
parent c51d9cdbf4
commit 045a0a2afd
174 changed files with 7295 additions and 404 deletions

View File

@@ -0,0 +1,51 @@
package activitypub
import (
"github.com/owncast/owncast/activitypub/crypto"
"github.com/owncast/owncast/activitypub/inbox"
"github.com/owncast/owncast/activitypub/outbox"
"github.com/owncast/owncast/activitypub/persistence"
"github.com/owncast/owncast/activitypub/workerpool"
"github.com/owncast/owncast/core/data"
"github.com/owncast/owncast/models"
log "github.com/sirupsen/logrus"
)
// Start will initialize and start the federation support.
func Start(datastore *data.Datastore) {
persistence.Setup(datastore)
workerpool.InitOutboundWorkerPool()
inbox.InitInboxWorkerPool()
StartRouter()
// Test
if data.GetPrivateKey() == "" {
privateKey, publicKey, err := crypto.GenerateKeys()
_ = data.SetPrivateKey(string(privateKey))
_ = data.SetPublicKey(string(publicKey))
if err != nil {
log.Errorln("Unable to get private key", err)
}
}
}
// SendLive will send a "Go Live" message to followers.
func SendLive() error {
return outbox.SendLive()
}
// SendPublicFederatedMessage will send an arbitrary provided message to followers.
func SendPublicFederatedMessage(message string) error {
return outbox.SendPublicMessage(message)
}
// GetFollowerCount will return the local tracked follower count.
func GetFollowerCount() (int64, error) {
return persistence.GetFollowerCount()
}
// GetPendingFollowRequests will return the pending follow requests.
func GetPendingFollowRequests() ([]models.Follower, error) {
return persistence.GetPendingFollowRequests()
}

View File

@@ -0,0 +1,88 @@
package apmodels
import (
"net/url"
"time"
"github.com/go-fed/activity/streams"
"github.com/go-fed/activity/streams/vocab"
"github.com/owncast/owncast/core/data"
)
// PrivacyAudience represents the audience for an activity.
type PrivacyAudience = string
const (
// PUBLIC is an audience meaning anybody can view the item.
PUBLIC PrivacyAudience = "https://www.w3.org/ns/activitystreams#Public"
)
// MakeCreateActivity will return a new Create activity with the provided ID.
func MakeCreateActivity(activityID *url.URL) vocab.ActivityStreamsCreate {
activity := streams.NewActivityStreamsCreate()
id := streams.NewJSONLDIdProperty()
id.Set(activityID)
activity.SetJSONLDId(id)
// CC the public if we're not treating ActivityPub as "private".
if !data.GetFederationIsPrivate() {
public, _ := url.Parse(PUBLIC)
to := streams.NewActivityStreamsToProperty()
to.AppendIRI(public)
activity.SetActivityStreamsTo(to)
audience := streams.NewActivityStreamsAudienceProperty()
audience.AppendIRI(public)
activity.SetActivityStreamsAudience(audience)
}
return activity
}
// MakeUpdateActivity will return a new Update activity with the provided aID.
func MakeUpdateActivity(activityID *url.URL) vocab.ActivityStreamsUpdate {
activity := streams.NewActivityStreamsUpdate()
id := streams.NewJSONLDIdProperty()
id.Set(activityID)
activity.SetJSONLDId(id)
// CC the public if we're not treating ActivityPub as "private".
if !data.GetFederationIsPrivate() {
public, _ := url.Parse(PUBLIC)
cc := streams.NewActivityStreamsCcProperty()
cc.AppendIRI(public)
activity.SetActivityStreamsCc(cc)
}
return activity
}
// MakeNote will return a new Note object.
func MakeNote(text string, noteIRI *url.URL, attributedToIRI *url.URL) vocab.ActivityStreamsNote {
note := streams.NewActivityStreamsNote()
content := streams.NewActivityStreamsContentProperty()
content.AppendXMLSchemaString(text)
note.SetActivityStreamsContent(content)
id := streams.NewJSONLDIdProperty()
id.Set(noteIRI)
note.SetJSONLDId(id)
published := streams.NewActivityStreamsPublishedProperty()
published.Set(time.Now())
note.SetActivityStreamsPublished(published)
attributedTo := attributedToIRI
attr := streams.NewActivityStreamsAttributedToProperty()
attr.AppendIRI(attributedTo)
note.SetActivityStreamsAttributedTo(attr)
// CC the public if we're not treating ActivityPub as "private".
if !data.GetFederationIsPrivate() {
public, _ := url.Parse(PUBLIC)
cc := streams.NewActivityStreamsCcProperty()
cc.AppendIRI(public)
note.SetActivityStreamsCc(cc)
}
return note
}

View File

@@ -0,0 +1,263 @@
package apmodels
import (
"fmt"
"net/url"
"time"
"github.com/go-fed/activity/streams"
"github.com/go-fed/activity/streams/vocab"
"github.com/owncast/owncast/activitypub/crypto"
"github.com/owncast/owncast/core/data"
"github.com/owncast/owncast/models"
log "github.com/sirupsen/logrus"
)
// ActivityPubActor represents a single actor in handling ActivityPub activity.
type ActivityPubActor struct {
// ActorIRI is the IRI of the remote actor.
ActorIri *url.URL
// FollowRequestIRI is the unique identifier of the follow request.
FollowRequestIri *url.URL
// Inbox is the inbox URL of the remote follower
Inbox *url.URL
// Name is the display name of the follower.
Name string
// Username is the account username of the remote actor.
Username string
// FullUsername is the username@account.tld representation of the user.
FullUsername string
// Image is the avatar image of the Actor.
Image *url.URL
// W3IDSecurityV1PublicKey is the public key of the actor.
W3IDSecurityV1PublicKey vocab.W3IDSecurityV1PublicKeyProperty
// DisabledAt is the time, if any, this follower was blocked/removed.
DisabledAt *time.Time
}
// DeleteRequest represents a request for delete.
type DeleteRequest struct {
ActorIri string
}
// MakeActorFromPerson takes a full ActivityPub Person and returns our internal
// representation of an actor.
func MakeActorFromPerson(person vocab.ActivityStreamsPerson) ActivityPubActor {
apActor := ActivityPubActor{
ActorIri: person.GetJSONLDId().Get(),
Inbox: person.GetActivityStreamsInbox().GetIRI(),
Name: person.GetActivityStreamsName().Begin().GetXMLSchemaString(),
Username: person.GetActivityStreamsPreferredUsername().GetXMLSchemaString(),
FullUsername: GetFullUsernameFromPerson(person),
W3IDSecurityV1PublicKey: person.GetW3IDSecurityV1PublicKey(),
}
if person.GetActivityStreamsIcon() != nil && person.GetActivityStreamsIcon().Len() > 0 {
apActor.Image = person.GetActivityStreamsIcon().At(0).GetActivityStreamsImage().GetActivityStreamsUrl().Begin().GetIRI()
}
return apActor
}
// MakeActorFromService takes a full ActivityPub Service and returns our internal
// representation of an actor.
func MakeActorFromService(service vocab.ActivityStreamsService) ActivityPubActor {
apActor := ActivityPubActor{
ActorIri: service.GetJSONLDId().Get(),
Inbox: service.GetActivityStreamsInbox().GetIRI(),
Name: service.GetActivityStreamsName().Begin().GetXMLSchemaString(),
Username: service.GetActivityStreamsPreferredUsername().GetXMLSchemaString(),
FullUsername: GetFullUsernameFromService(service),
W3IDSecurityV1PublicKey: service.GetW3IDSecurityV1PublicKey(),
}
if service.GetActivityStreamsIcon() != nil && service.GetActivityStreamsIcon().Len() > 0 {
apActor.Image = service.GetActivityStreamsIcon().At(0).GetActivityStreamsImage().GetActivityStreamsUrl().Begin().GetIRI()
}
return apActor
}
// MakeActorPropertyWithID will return an actor property filled with the provided IRI.
func MakeActorPropertyWithID(idIRI *url.URL) vocab.ActivityStreamsActorProperty {
actor := streams.NewActivityStreamsActorProperty()
actor.AppendIRI(idIRI)
return actor
}
// MakeServiceForAccount will create a new local actor service with the the provided username.
func MakeServiceForAccount(accountName string) vocab.ActivityStreamsService {
actorIRI := MakeLocalIRIForAccount(accountName)
person := streams.NewActivityStreamsService()
nameProperty := streams.NewActivityStreamsNameProperty()
nameProperty.AppendXMLSchemaString(data.GetServerName())
person.SetActivityStreamsName(nameProperty)
preferredUsernameProperty := streams.NewActivityStreamsPreferredUsernameProperty()
preferredUsernameProperty.SetXMLSchemaString(accountName)
person.SetActivityStreamsPreferredUsername(preferredUsernameProperty)
inboxIRI := MakeLocalIRIForResource("/user/" + accountName + "/inbox")
inboxProp := streams.NewActivityStreamsInboxProperty()
inboxProp.SetIRI(inboxIRI)
person.SetActivityStreamsInbox(inboxProp)
needsFollowApprovalProperty := streams.NewActivityStreamsManuallyApprovesFollowersProperty()
needsFollowApprovalProperty.Set(data.GetFederationIsPrivate())
person.SetActivityStreamsManuallyApprovesFollowers(needsFollowApprovalProperty)
outboxIRI := MakeLocalIRIForResource("/user/" + accountName + "/outbox")
outboxProp := streams.NewActivityStreamsOutboxProperty()
outboxProp.SetIRI(outboxIRI)
person.SetActivityStreamsOutbox(outboxProp)
id := streams.NewJSONLDIdProperty()
id.Set(actorIRI)
person.SetJSONLDId(id)
publicKey := crypto.GetPublicKey(actorIRI)
publicKeyProp := streams.NewW3IDSecurityV1PublicKeyProperty()
publicKeyType := streams.NewW3IDSecurityV1PublicKey()
pubKeyIDProp := streams.NewJSONLDIdProperty()
pubKeyIDProp.Set(publicKey.ID)
publicKeyType.SetJSONLDId(pubKeyIDProp)
ownerProp := streams.NewW3IDSecurityV1OwnerProperty()
ownerProp.SetIRI(publicKey.Owner)
publicKeyType.SetW3IDSecurityV1Owner(ownerProp)
publicKeyPemProp := streams.NewW3IDSecurityV1PublicKeyPemProperty()
publicKeyPemProp.Set(publicKey.PublicKeyPem)
publicKeyType.SetW3IDSecurityV1PublicKeyPem(publicKeyPemProp)
publicKeyProp.AppendW3IDSecurityV1PublicKey(publicKeyType)
person.SetW3IDSecurityV1PublicKey(publicKeyProp)
if t, err := data.GetServerInitTime(); t != nil {
publishedDateProp := streams.NewActivityStreamsPublishedProperty()
publishedDateProp.Set(t.Time)
person.SetActivityStreamsPublished(publishedDateProp)
} else {
log.Errorln("unable to fetch server init time", err)
}
// Profile properties
// Avatar
userAvatarURLString := data.GetServerURL() + "/logo/external"
userAvatarURL, err := url.Parse(userAvatarURLString)
if err != nil {
log.Errorln("unable to parse user avatar url", userAvatarURLString, err)
}
image := streams.NewActivityStreamsImage()
imgProp := streams.NewActivityStreamsUrlProperty()
imgProp.AppendIRI(userAvatarURL)
image.SetActivityStreamsUrl(imgProp)
icon := streams.NewActivityStreamsIconProperty()
icon.AppendActivityStreamsImage(image)
person.SetActivityStreamsIcon(icon)
// Actor URL
urlProperty := streams.NewActivityStreamsUrlProperty()
urlProperty.AppendIRI(actorIRI)
person.SetActivityStreamsUrl(urlProperty)
// Profile header
headerImage := streams.NewActivityStreamsImage()
headerImgPropURL := streams.NewActivityStreamsUrlProperty()
headerImgPropURL.AppendIRI(userAvatarURL)
headerImage.SetActivityStreamsUrl(headerImgPropURL)
headerImageProp := streams.NewActivityStreamsImageProperty()
headerImageProp.AppendActivityStreamsImage(headerImage)
person.SetActivityStreamsImage(headerImageProp)
// Profile bio
summaryProperty := streams.NewActivityStreamsSummaryProperty()
summaryProperty.AppendXMLSchemaString(data.GetServerSummary())
person.SetActivityStreamsSummary(summaryProperty)
// Links
for _, link := range data.GetSocialHandles() {
addMetadataLinkToProfile(person, link.Platform, link.URL)
}
// Discoverable
discoverableProperty := streams.NewTootDiscoverableProperty()
discoverableProperty.Set(true)
person.SetTootDiscoverable(discoverableProperty)
// Followers
followersProperty := streams.NewActivityStreamsFollowersProperty()
followersURL := *actorIRI
followersURL.Path = actorIRI.Path + "/followers"
followersProperty.SetIRI(&followersURL)
person.SetActivityStreamsFollowers(followersProperty)
// Tags
tagProp := streams.NewActivityStreamsTagProperty()
for _, tagString := range data.GetServerMetadataTags() {
hashtag := MakeHashtag(tagString)
tagProp.AppendTootHashtag(hashtag)
}
person.SetActivityStreamsTag(tagProp)
// Work around an issue where a single attachment will not serialize
// as an array, so add another item to the mix.
if len(data.GetSocialHandles()) == 1 {
addMetadataLinkToProfile(person, "Owncast", "https://owncast.online")
}
return person
}
// GetFullUsernameFromPerson will return the user@host.tld formatted user given a person object.
func GetFullUsernameFromPerson(person vocab.ActivityStreamsPerson) string {
hostname := person.GetJSONLDId().GetIRI().Hostname()
username := person.GetActivityStreamsPreferredUsername().GetXMLSchemaString()
fullUsername := fmt.Sprintf("%s@%s", username, hostname)
return fullUsername
}
// GetFullUsernameFromService will return the user@host.tld formatted user given a service object.
func GetFullUsernameFromService(person vocab.ActivityStreamsService) string {
hostname := person.GetJSONLDId().GetIRI().Hostname()
username := person.GetActivityStreamsPreferredUsername().GetXMLSchemaString()
fullUsername := fmt.Sprintf("%s@%s", username, hostname)
return fullUsername
}
func addMetadataLinkToProfile(profile vocab.ActivityStreamsService, name string, url string) {
attachments := profile.GetActivityStreamsAttachment()
if attachments == nil {
attachments = streams.NewActivityStreamsAttachmentProperty()
}
displayName := name
socialHandle := models.GetSocialHandle(name)
if socialHandle != nil {
displayName = socialHandle.Platform
}
linkValue := fmt.Sprintf("<a href=\"%s\" rel=\"me nofollow noopener noreferrer\" target=\"_blank\">%s</a>", url, url)
attachment := streams.NewActivityStreamsObject()
attachmentProp := streams.NewJSONLDTypeProperty()
attachmentProp.AppendXMLSchemaString("PropertyValue")
attachment.SetJSONLDType(attachmentProp)
attachmentName := streams.NewActivityStreamsNameProperty()
attachmentName.AppendXMLSchemaString(displayName)
attachment.SetActivityStreamsName(attachmentName)
attachment.GetUnknownProperties()["value"] = linkValue
attachments.AppendActivityStreamsObject(attachment)
profile.SetActivityStreamsAttachment(attachments)
}

View File

@@ -0,0 +1,168 @@
package apmodels
import (
"io/ioutil"
"net/url"
"os"
"testing"
"github.com/go-fed/activity/streams"
"github.com/go-fed/activity/streams/vocab"
"github.com/owncast/owncast/core/data"
)
func makeFakeService() vocab.ActivityStreamsService {
iri, _ := url.Parse("https://fake.fediverse.server/user/mrfoo")
name := "Mr Foo"
username := "foodawg"
inbox, _ := url.Parse("https://fake.fediverse.server/user/mrfoo/inbox")
userAvatarURL, _ := url.Parse("https://fake.fediverse.server/user/mrfoo/avatar.png")
service := streams.NewActivityStreamsService()
id := streams.NewJSONLDIdProperty()
id.Set(iri)
service.SetJSONLDId(id)
nameProperty := streams.NewActivityStreamsNameProperty()
nameProperty.AppendXMLSchemaString(name)
service.SetActivityStreamsName(nameProperty)
preferredUsernameProperty := streams.NewActivityStreamsPreferredUsernameProperty()
preferredUsernameProperty.SetXMLSchemaString(username)
service.SetActivityStreamsPreferredUsername(preferredUsernameProperty)
inboxProp := streams.NewActivityStreamsInboxProperty()
inboxProp.SetIRI(inbox)
service.SetActivityStreamsInbox(inboxProp)
image := streams.NewActivityStreamsImage()
imgProp := streams.NewActivityStreamsUrlProperty()
imgProp.AppendIRI(userAvatarURL)
image.SetActivityStreamsUrl(imgProp)
icon := streams.NewActivityStreamsIconProperty()
icon.AppendActivityStreamsImage(image)
service.SetActivityStreamsIcon(icon)
return service
}
func TestMain(m *testing.M) {
dbFile, err := ioutil.TempFile(os.TempDir(), "owncast-test-db.db")
if err != nil {
panic(err)
}
data.SetupPersistence(dbFile.Name())
data.SetServerURL("https://my.cool.site.biz")
m.Run()
}
func TestMakeActorFromService(t *testing.T) {
service := makeFakeService()
actor := MakeActorFromService(service)
if actor.ActorIri != service.GetJSONLDId().GetIRI() {
t.Errorf("actor.ID = %v, want %v", actor.ActorIri, service.GetJSONLDId().GetIRI())
}
if actor.Name != service.GetActivityStreamsName().At(0).GetXMLSchemaString() {
t.Errorf("actor.Name = %v, want %v", actor.Name, service.GetActivityStreamsName().At(0).GetXMLSchemaString())
}
if actor.Username != service.GetActivityStreamsPreferredUsername().GetXMLSchemaString() {
t.Errorf("actor.Username = %v, want %v", actor.Username, service.GetActivityStreamsPreferredUsername().GetXMLSchemaString())
}
if actor.Inbox != service.GetActivityStreamsInbox().GetIRI() {
t.Errorf("actor.Inbox = %v, want %v", actor.Inbox.String(), service.GetActivityStreamsInbox().GetIRI())
}
if actor.Image != service.GetActivityStreamsIcon().At(0).GetActivityStreamsImage().GetActivityStreamsUrl().At(0).GetIRI() {
t.Errorf("actor.Image = %v, want %v", actor.Image, service.GetActivityStreamsIcon().At(0).GetActivityStreamsImage().GetActivityStreamsUrl().At(0).GetIRI())
}
}
func TestMakeActorPropertyWithID(t *testing.T) {
iri, _ := url.Parse("https://fake.fediverse.server/user/mrfoo")
actor := MakeActorPropertyWithID(iri)
if actor.Begin().GetIRI() != iri {
t.Errorf("actor.IRI = %v, want %v", actor.Begin().GetIRI(), iri)
}
}
func TestGetFullUsernameFromPerson(t *testing.T) {
expected := "foodawg@fake.fediverse.server"
person := makeFakeService()
username := GetFullUsernameFromService(person)
if username != expected {
t.Errorf("actor.Username = %v, want %v", username, expected)
}
}
func TestAddMetadataLinkToProfile(t *testing.T) {
person := makeFakeService()
addMetadataLinkToProfile(person, "my site", "https://my.cool.site.biz")
attchment := person.GetActivityStreamsAttachment().At(0)
nameValue := attchment.GetActivityStreamsObject().GetActivityStreamsName().At(0).GetXMLSchemaString()
expected := "my site"
if nameValue != expected {
t.Errorf("attachment name = %v, want %v", nameValue, expected)
}
propertyValue := attchment.GetActivityStreamsObject().GetUnknownProperties()["value"]
expected = `<a href="https://my.cool.site.biz" rel="me nofollow noopener noreferrer" target="_blank">https://my.cool.site.biz</a>`
if propertyValue != expected {
t.Errorf("attachment value = %v, want %v", propertyValue, expected)
}
}
func TestMakeServiceForAccount(t *testing.T) {
person := MakeServiceForAccount("accountname")
expectedIRI := "https://my.cool.site.biz/federation/user/accountname"
if person.GetJSONLDId().Get().String() != expectedIRI {
t.Errorf("actor.IRI = %v, want %v", person.GetJSONLDId().Get().String(), expectedIRI)
}
if person.GetActivityStreamsPreferredUsername().GetXMLSchemaString() != "accountname" {
t.Errorf("actor.PreferredUsername = %v, want %v", person.GetActivityStreamsPreferredUsername().GetXMLSchemaString(), expectedIRI)
}
expectedInbox := "https://my.cool.site.biz/federation/user/accountname/inbox"
if person.GetActivityStreamsInbox().GetIRI().String() != expectedInbox {
t.Errorf("actor.Inbox = %v, want %v", person.GetActivityStreamsInbox().GetIRI().String(), expectedInbox)
}
expectedOutbox := "https://my.cool.site.biz/federation/user/accountname/outbox"
if person.GetActivityStreamsOutbox().GetIRI().String() != expectedOutbox {
t.Errorf("actor.Outbox = %v, want %v", person.GetActivityStreamsOutbox().GetIRI().String(), expectedOutbox)
}
expectedFollowers := "https://my.cool.site.biz/federation/user/accountname/followers"
if person.GetActivityStreamsFollowers().GetIRI().String() != expectedFollowers {
t.Errorf("actor.Followers = %v, want %v", person.GetActivityStreamsFollowers().GetIRI().String(), expectedFollowers)
}
expectedName := "Owncast"
if person.GetActivityStreamsName().Begin().GetXMLSchemaString() != expectedName {
t.Errorf("actor.Name = %v, want %v", person.GetActivityStreamsName().Begin().GetXMLSchemaString(), expectedName)
}
expectedAvatar := "https://my.cool.site.biz/logo/external"
if person.GetActivityStreamsIcon().At(0).GetActivityStreamsImage().GetActivityStreamsUrl().Begin().GetIRI().String() != expectedAvatar {
t.Errorf("actor.Avatar = %v, want %v", person.GetActivityStreamsIcon().At(0).GetActivityStreamsImage().GetActivityStreamsUrl().Begin().GetIRI().String(), expectedAvatar)
}
expectedSummary := "Welcome to your new Owncast server! This description can be changed in the admin. Visit https://owncast.online/docs/configuration/ to learn more."
if person.GetActivityStreamsSummary().At(0).GetXMLSchemaString() != expectedSummary {
t.Errorf("actor.Summary = %v, want %v", person.GetActivityStreamsSummary().At(0).GetXMLSchemaString(), expectedSummary)
}
if person.GetActivityStreamsUrl().At(0).GetIRI().String() != expectedIRI {
t.Errorf("actor.URL = %v, want %v", person.GetActivityStreamsUrl().At(0).GetIRI().String(), expectedIRI)
}
}

View File

@@ -0,0 +1,24 @@
package apmodels
import (
"net/url"
"github.com/go-fed/activity/streams"
"github.com/go-fed/activity/streams/vocab"
)
// MakeHashtag will create and return a mastodon toot hashtag object with the provided name.
func MakeHashtag(name string) vocab.TootHashtag {
u, _ := url.Parse("https://directory.owncast.online/tags/" + name)
hashtag := streams.NewTootHashtag()
hashtagName := streams.NewActivityStreamsNameProperty()
hashtagName.AppendXMLSchemaString("#" + name)
hashtag.SetActivityStreamsName(hashtagName)
hashtagHref := streams.NewActivityStreamsHrefProperty()
hashtagHref.Set(u)
hashtag.SetActivityStreamsHref(hashtagHref)
return hashtag
}

View File

@@ -0,0 +1,10 @@
package apmodels
import "net/http"
// InboxRequest represents an inbound request to the ActivityPub inbox.
type InboxRequest struct {
Request *http.Request
ForLocalAccount string
Body []byte
}

View File

@@ -0,0 +1,50 @@
package apmodels
import (
"net/url"
"github.com/go-fed/activity/streams"
"github.com/go-fed/activity/streams/vocab"
)
// CreateCreateActivity will create a new Create Activity model with the provided ID and IRI.
func CreateCreateActivity(id string, localAccountIRI *url.URL) vocab.ActivityStreamsCreate {
objectID := MakeLocalIRIForResource(id)
message := MakeCreateActivity(objectID)
actorProp := streams.NewActivityStreamsActorProperty()
actorProp.AppendIRI(localAccountIRI)
message.SetActivityStreamsActor(actorProp)
return message
}
// AddImageAttachmentToNote will add the provided image URL to the provided note object.
func AddImageAttachmentToNote(note vocab.ActivityStreamsNote, image string) {
imageURL, err := url.Parse(image)
if err != nil {
return
}
attachments := note.GetActivityStreamsAttachment()
if attachments == nil {
attachments = streams.NewActivityStreamsAttachmentProperty()
}
urlProp := streams.NewActivityStreamsUrlProperty()
urlProp.AppendIRI(imageURL)
apImage := streams.NewActivityStreamsImage()
apImage.SetActivityStreamsUrl(urlProp)
imageProp := streams.NewActivityStreamsImageProperty()
imageProp.AppendActivityStreamsImage(apImage)
imageDescription := streams.NewActivityStreamsContentProperty()
imageDescription.AppendXMLSchemaString("Live stream preview")
apImage.SetActivityStreamsContent(imageDescription)
attachments.AppendActivityStreamsImage(apImage)
note.SetActivityStreamsAttachment(attachments)
}

View File

@@ -0,0 +1,62 @@
package apmodels
import (
"encoding/json"
"net/url"
"path"
"github.com/go-fed/activity/streams"
"github.com/go-fed/activity/streams/vocab"
"github.com/owncast/owncast/core/data"
log "github.com/sirupsen/logrus"
)
// MakeRemoteIRIForResource will create an IRI for a remote location.
func MakeRemoteIRIForResource(resourcePath string, host string) (*url.URL, error) {
generatedURL := "https://" + host
u, err := url.Parse(generatedURL)
if err != nil {
return nil, err
}
u.Path = path.Join(u.Path, "federation", resourcePath)
return u, nil
}
// MakeLocalIRIForResource will create an IRI for the local server.
func MakeLocalIRIForResource(resourcePath string) *url.URL {
host := data.GetServerURL()
u, err := url.Parse(host)
if err != nil {
log.Errorln("unable to parse local IRI url", host, err)
return nil
}
u.Path = path.Join(u.Path, "federation", resourcePath)
return u
}
// MakeLocalIRIForAccount will return a full IRI for the local server account username.
func MakeLocalIRIForAccount(account string) *url.URL {
host := data.GetServerURL()
u, err := url.Parse(host)
if err != nil {
log.Errorln("unable to parse local IRI account server url", err)
return nil
}
u.Path = path.Join(u.Path, "federation", "user", account)
return u
}
// Serialize will serialize an ActivityPub object to a byte slice.
func Serialize(obj vocab.Type) ([]byte, error) {
var jsonmap map[string]interface{}
jsonmap, _ = streams.Serialize(obj)
b, err := json.Marshal(jsonmap)
return b, err
}

View File

@@ -0,0 +1,43 @@
package apmodels
import (
"fmt"
)
// WebfingerResponse represents a Webfinger response.
type WebfingerResponse struct {
Aliases []string `json:"aliases"`
Subject string `json:"subject"`
Links []Link `json:"links"`
}
// Link represents a Webfinger response Link entity.
type Link struct {
Rel string `json:"rel"`
Type string `json:"type"`
Href string `json:"href"`
}
// MakeWebfingerResponse will create a new Webfinger response.
func MakeWebfingerResponse(account string, inbox string, host string) WebfingerResponse {
accountIRI := MakeLocalIRIForAccount(account)
return WebfingerResponse{
Subject: fmt.Sprintf("acct:%s@%s", account, host),
Aliases: []string{
accountIRI.String(),
},
Links: []Link{
{
Rel: "self",
Type: "application/activity+json",
Href: accountIRI.String(),
},
{
Rel: "http://webfinger.net/rel/profile-page",
Type: "text/html",
Href: accountIRI.String(),
},
},
}
}

View File

@@ -0,0 +1,58 @@
package controllers
import (
"net/http"
"strings"
log "github.com/sirupsen/logrus"
"github.com/owncast/owncast/activitypub/apmodels"
"github.com/owncast/owncast/activitypub/crypto"
"github.com/owncast/owncast/activitypub/requests"
"github.com/owncast/owncast/core/data"
)
// ActorHandler handles requests for a single actor.
func ActorHandler(w http.ResponseWriter, r *http.Request) {
if !data.GetFederationEnabled() {
w.WriteHeader(http.StatusMethodNotAllowed)
return
}
pathComponents := strings.Split(r.URL.Path, "/")
accountName := pathComponents[3]
if _, valid := data.GetFederatedInboxMap()[accountName]; !valid {
// User is not valid
w.WriteHeader(http.StatusNotFound)
return
}
// If this request is for an actor's inbox then pass
// the request to the inbox controller.
if len(pathComponents) == 5 && pathComponents[4] == "inbox" {
InboxHandler(w, r)
return
} else if len(pathComponents) == 5 && pathComponents[4] == "outbox" {
OutboxHandler(w, r)
return
} else if len(pathComponents) == 5 && pathComponents[4] == "followers" {
// followers list
FollowersHandler(w, r)
return
} else if len(pathComponents) == 5 && pathComponents[4] == "following" {
// following list (none)
w.WriteHeader(http.StatusNotFound)
return
}
actorIRI := apmodels.MakeLocalIRIForAccount(accountName)
publicKey := crypto.GetPublicKey(actorIRI)
person := apmodels.MakeServiceForAccount(accountName)
if err := requests.WriteStreamResponse(person, w, publicKey); err != nil {
log.Errorln("unable to write stream response for actor handler", err)
w.WriteHeader(http.StatusInternalServerError)
return
}
}

View File

@@ -0,0 +1,166 @@
package controllers
import (
"fmt"
"net/http"
"net/url"
"strconv"
"strings"
"github.com/pkg/errors"
log "github.com/sirupsen/logrus"
"github.com/go-fed/activity/streams"
"github.com/go-fed/activity/streams/vocab"
"github.com/owncast/owncast/activitypub/apmodels"
"github.com/owncast/owncast/activitypub/crypto"
"github.com/owncast/owncast/activitypub/persistence"
"github.com/owncast/owncast/activitypub/requests"
"github.com/owncast/owncast/core/data"
)
const (
followersPageSize = 50
)
// FollowersHandler will return the list of remote followers on the Fediverse.
func FollowersHandler(w http.ResponseWriter, r *http.Request) {
if r.Method != "GET" {
w.WriteHeader(http.StatusMethodNotAllowed)
return
}
var response interface{}
var err error
if r.URL.Query().Get("page") != "" {
response, err = getFollowersPage(r.URL.Query().Get("page"), r)
} else {
response, err = getInitialFollowersRequest(r)
}
if response == nil {
w.WriteHeader(http.StatusInternalServerError)
return
}
if err != nil {
_, _ = w.Write([]byte(err.Error()))
w.WriteHeader(http.StatusInternalServerError)
return
}
pathComponents := strings.Split(r.URL.Path, "/")
accountName := pathComponents[3]
actorIRI := apmodels.MakeLocalIRIForAccount(accountName)
publicKey := crypto.GetPublicKey(actorIRI)
if err := requests.WriteStreamResponse(response.(vocab.Type), w, publicKey); err != nil {
log.Errorln("unable to write stream response for followers handler", err)
}
}
func getInitialFollowersRequest(r *http.Request) (vocab.ActivityStreamsOrderedCollection, error) {
followerCount, _ := persistence.GetFollowerCount()
collection := streams.NewActivityStreamsOrderedCollection()
idProperty := streams.NewJSONLDIdProperty()
id, err := createPageURL(r, nil)
if err != nil {
return nil, errors.Wrap(err, "unable to create followers page property")
}
idProperty.SetIRI(id)
collection.SetJSONLDId(idProperty)
totalItemsProperty := streams.NewActivityStreamsTotalItemsProperty()
totalItemsProperty.Set(int(followerCount))
collection.SetActivityStreamsTotalItems(totalItemsProperty)
first := streams.NewActivityStreamsFirstProperty()
page := "1"
firstIRI, err := createPageURL(r, &page)
if err != nil {
return nil, errors.Wrap(err, "unable to create first page property")
}
first.SetIRI(firstIRI)
collection.SetActivityStreamsFirst(first)
return collection, nil
}
func getFollowersPage(page string, r *http.Request) (vocab.ActivityStreamsOrderedCollectionPage, error) {
pageInt, err := strconv.Atoi(page)
if err != nil {
return nil, errors.Wrap(err, "unable to parse page number")
}
followerCount, err := persistence.GetFollowerCount()
if err != nil {
return nil, errors.Wrap(err, "unable to get follower count")
}
followers, err := persistence.GetFederationFollowers(followersPageSize, (pageInt-1)*followersPageSize)
if err != nil {
return nil, errors.Wrap(err, "unable to get federation followers")
}
collectionPage := streams.NewActivityStreamsOrderedCollectionPage()
idProperty := streams.NewJSONLDIdProperty()
id, err := createPageURL(r, &page)
if err != nil {
return nil, errors.Wrap(err, "unable to create followers page ID")
}
idProperty.SetIRI(id)
collectionPage.SetJSONLDId(idProperty)
orderedItems := streams.NewActivityStreamsOrderedItemsProperty()
for _, follower := range followers {
u, _ := url.Parse(follower.ActorIRI)
orderedItems.AppendIRI(u)
}
collectionPage.SetActivityStreamsOrderedItems(orderedItems)
partOf := streams.NewActivityStreamsPartOfProperty()
partOfIRI, err := createPageURL(r, nil)
if err != nil {
return nil, errors.Wrap(err, "unable to create partOf property for followers page")
}
partOf.SetIRI(partOfIRI)
collectionPage.SetActivityStreamsPartOf(partOf)
if pageInt*followersPageSize < int(followerCount) {
next := streams.NewActivityStreamsNextProperty()
nextPage := fmt.Sprintf("%d", pageInt+1)
nextIRI, err := createPageURL(r, &nextPage)
if err != nil {
return nil, errors.Wrap(err, "unable to create next page property")
}
next.SetIRI(nextIRI)
collectionPage.SetActivityStreamsNext(next)
}
return collectionPage, nil
}
func createPageURL(r *http.Request, page *string) (*url.URL, error) {
domain := data.GetServerURL()
if domain == "" {
return nil, errors.New("unable to get server URL")
}
pageURL, err := url.Parse(domain)
if err != nil {
return nil, errors.Wrap(err, "unable to parse server URL")
}
if page != nil {
query := pageURL.Query()
query.Add("page", *page)
pageURL.RawQuery = query.Encode()
}
pageURL.Path = r.URL.Path
return pageURL, nil
}

View File

@@ -0,0 +1,56 @@
package controllers
import (
"io"
"net/http"
"strings"
"github.com/owncast/owncast/activitypub/apmodels"
"github.com/owncast/owncast/activitypub/inbox"
"github.com/owncast/owncast/core/data"
log "github.com/sirupsen/logrus"
)
// InboxHandler handles inbound federated requests.
func InboxHandler(w http.ResponseWriter, r *http.Request) {
if r.Method == http.MethodPost {
acceptInboxRequest(w, r)
} else {
w.WriteHeader(http.StatusMethodNotAllowed)
}
}
func acceptInboxRequest(w http.ResponseWriter, r *http.Request) {
if !data.GetFederationEnabled() {
w.WriteHeader(http.StatusMethodNotAllowed)
return
}
urlPathComponents := strings.Split(r.URL.Path, "/")
var forLocalAccount string
if len(urlPathComponents) == 5 {
forLocalAccount = urlPathComponents[3]
} else {
log.Errorln("Unable to determine username from url path")
w.WriteHeader(http.StatusNotFound)
return
}
// The account this request is for must match the account name we have set
// for federation.
if forLocalAccount != data.GetFederationUsername() {
w.WriteHeader(http.StatusNotFound)
return
}
data, err := io.ReadAll(r.Body)
if err != nil {
log.Errorln("Unable to read inbox request payload", err)
return
}
inboxRequest := apmodels.InboxRequest{Request: r, ForLocalAccount: forLocalAccount, Body: data}
inbox.AddToQueue(inboxRequest)
w.WriteHeader(http.StatusAccepted)
}

View File

@@ -0,0 +1,285 @@
package controllers
import (
"fmt"
"net/http"
"net/url"
"github.com/owncast/owncast/activitypub/apmodels"
"github.com/owncast/owncast/activitypub/crypto"
"github.com/owncast/owncast/activitypub/persistence"
"github.com/owncast/owncast/activitypub/requests"
"github.com/owncast/owncast/config"
"github.com/owncast/owncast/core/data"
log "github.com/sirupsen/logrus"
)
// NodeInfoController returns the V1 node info response.
func NodeInfoController(w http.ResponseWriter, r *http.Request) {
type links struct {
Rel string `json:"rel"`
Href string `json:"href"`
}
type response struct {
Links []links `json:"links"`
}
if !data.GetFederationEnabled() {
w.WriteHeader(http.StatusMethodNotAllowed)
return
}
serverURL := data.GetServerURL()
if serverURL == "" {
w.WriteHeader(http.StatusNotFound)
return
}
v2, err := url.Parse(serverURL)
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
return
}
v2.Path = "nodeinfo/2.0"
res := response{
Links: []links{
{
Rel: "http://nodeinfo.diaspora.software/ns/schema/2.0",
Href: v2.String(),
},
},
}
if err := writeResponse(res, w); err != nil {
log.Errorln(err)
}
}
// NodeInfoV2Controller returns the V2 node info response.
func NodeInfoV2Controller(w http.ResponseWriter, r *http.Request) {
type software struct {
Name string `json:"name"`
Version string `json:"version"`
}
type users struct {
Total int `json:"total"`
ActiveMonth int `json:"activeMonth"`
ActiveHalfyear int `json:"activeHalfyear"`
}
type usage struct {
Users users `json:"users"`
LocalPosts int `json:"localPosts"`
}
type response struct {
Version string `json:"version"`
Software software `json:"software"`
Protocols []string `json:"protocols"`
Usage usage `json:"usage"`
OpenRegistrations bool `json:"openRegistrations"`
}
if !data.GetFederationEnabled() {
w.WriteHeader(http.StatusMethodNotAllowed)
return
}
localPostCount, _ := persistence.GetLocalPostCount()
res := response{
Version: "2.0",
Software: software{
Name: "Owncast",
Version: config.VersionNumber,
},
Usage: usage{
Users: users{
Total: 1,
ActiveMonth: 1,
ActiveHalfyear: 1,
},
LocalPosts: int(localPostCount),
},
OpenRegistrations: false,
Protocols: []string{"activitypub"},
}
if err := writeResponse(res, w); err != nil {
log.Errorln(err)
}
}
// XNodeInfo2Controller returns the x-nodeinfo2.
func XNodeInfo2Controller(w http.ResponseWriter, r *http.Request) {
type Organization struct {
Name string `json:"name"`
Contact string `json:"contact"`
}
type Server struct {
BaseURL string `json:"baseUrl"`
Version string `json:"version"`
Name string `json:"name"`
Software string `json:"software"`
}
type Services struct {
Outbound []string `json:"outbound"`
Inbound []string `json:"inbound"`
}
type Users struct {
ActiveWeek int `json:"activeWeek"`
Total int `json:"total"`
ActiveMonth int `json:"activeMonth"`
ActiveHalfyear int `json:"activeHalfyear"`
}
type Usage struct {
Users Users `json:"users"`
LocalPosts int `json:"localPosts"`
LocalComments int `json:"localComments"`
}
type response struct {
Organization Organization `json:"organization"`
Server Server `json:"server"`
Services Services `json:"services"`
Protocols []string `json:"protocols"`
Version string `json:"version"`
OpenRegistrations bool `json:"openRegistrations"`
Usage Usage `json:"usage"`
}
if !data.GetFederationEnabled() {
w.WriteHeader(http.StatusMethodNotAllowed)
return
}
serverURL := data.GetServerURL()
if serverURL == "" {
w.WriteHeader(http.StatusNotFound)
return
}
localPostCount, _ := persistence.GetLocalPostCount()
res := &response{
Organization: Organization{
Name: data.GetServerName(),
Contact: serverURL,
},
Server: Server{
BaseURL: serverURL,
Version: config.VersionNumber,
Name: "owncast",
Software: "owncast",
},
Services: Services{
Inbound: []string{"activitypub"},
Outbound: []string{"activitypub"},
},
Protocols: []string{"activitypub"},
Version: config.VersionNumber,
Usage: Usage{
Users: Users{
ActiveWeek: 1,
Total: 1,
ActiveMonth: 1,
ActiveHalfyear: 1,
},
LocalPosts: int(localPostCount),
LocalComments: 0,
},
}
if err := writeResponse(res, w); err != nil {
log.Errorln(err)
}
}
// InstanceV1Controller returns the v1 instance details.
func InstanceV1Controller(w http.ResponseWriter, r *http.Request) {
type Stats struct {
UserCount int `json:"user_count"`
StatusCount int `json:"status_count"`
DomainCount int `json:"domain_count"`
}
type response struct {
URI string `json:"uri"`
Title string `json:"title"`
ShortDescription string `json:"short_description"`
Description string `json:"description"`
Version string `json:"version"`
Stats Stats `json:"stats"`
Thumbnail string `json:"thumbnail"`
Languages []string `json:"languages"`
Registrations bool `json:"registrations"`
ApprovalRequired bool `json:"approval_required"`
InvitesEnabled bool `json:"invites_enabled"`
}
if !data.GetFederationEnabled() {
w.WriteHeader(http.StatusMethodNotAllowed)
return
}
serverURL := data.GetServerURL()
if serverURL == "" {
w.WriteHeader(http.StatusNotFound)
return
}
thumbnail, err := url.Parse(serverURL)
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
return
}
thumbnail.Path = "/logo/external"
localPostCount, _ := persistence.GetLocalPostCount()
res := response{
URI: serverURL,
Title: data.GetServerName(),
ShortDescription: data.GetServerSummary(),
Description: data.GetServerSummary(),
Version: config.GetReleaseString(),
Stats: Stats{
UserCount: 1,
StatusCount: int(localPostCount),
DomainCount: 0,
},
Thumbnail: thumbnail.String(),
Registrations: false,
ApprovalRequired: false,
InvitesEnabled: false,
}
if err := writeResponse(res, w); err != nil {
log.Errorln(err)
}
}
func writeResponse(payload interface{}, w http.ResponseWriter) error {
accountName := data.GetDefaultFederationUsername()
actorIRI := apmodels.MakeLocalIRIForAccount(accountName)
publicKey := crypto.GetPublicKey(actorIRI)
return requests.WritePayloadResponse(payload, w, publicKey)
}
// HostMetaController points to webfinger.
func HostMetaController(w http.ResponseWriter, r *http.Request) {
serverURL := data.GetServerURL()
if serverURL == "" {
w.WriteHeader(http.StatusNotFound)
return
}
res := fmt.Sprintf(`<?xml version="1.0" encoding="UTF-8"?>
<XRD xmlns="http://docs.oasis-open.org/ns/xri/xrd-1.0">
<Link rel="lrdd" type="application/json" template="%s/.well-known/webfinger?resource={uri}"/>
</XRD>`, serverURL)
if _, err := w.Write([]byte(res)); err != nil {
log.Errorln(err)
}
}

View File

@@ -0,0 +1,42 @@
package controllers
import (
"net/http"
"strings"
"github.com/owncast/owncast/activitypub/apmodels"
"github.com/owncast/owncast/activitypub/crypto"
"github.com/owncast/owncast/activitypub/persistence"
"github.com/owncast/owncast/activitypub/requests"
"github.com/owncast/owncast/core/data"
log "github.com/sirupsen/logrus"
)
// ObjectHandler handles requests for a single federated ActivityPub object.
func ObjectHandler(w http.ResponseWriter, r *http.Request) {
if !data.GetFederationEnabled() {
w.WriteHeader(http.StatusMethodNotAllowed)
return
}
// If private federation mode is enabled do not allow access to objects.
if data.GetFederationIsPrivate() {
w.WriteHeader(http.StatusNotFound)
return
}
iri := strings.Join([]string{strings.TrimSuffix(data.GetServerURL(), "/"), r.URL.Path}, "")
object, _, _, err := persistence.GetObjectByIRI(iri)
if err != nil {
w.WriteHeader(http.StatusNotFound)
return
}
accountName := data.GetDefaultFederationUsername()
actorIRI := apmodels.MakeLocalIRIForAccount(accountName)
publicKey := crypto.GetPublicKey(actorIRI)
if err := requests.WriteResponse([]byte(object), w, publicKey); err != nil {
log.Errorln(err)
}
}

View File

@@ -0,0 +1,156 @@
package controllers
import (
"fmt"
"net/http"
"strconv"
"strings"
"github.com/go-fed/activity/streams"
"github.com/go-fed/activity/streams/vocab"
"github.com/owncast/owncast/activitypub/apmodels"
"github.com/owncast/owncast/activitypub/crypto"
"github.com/owncast/owncast/activitypub/persistence"
"github.com/owncast/owncast/activitypub/requests"
"github.com/pkg/errors"
log "github.com/sirupsen/logrus"
)
const (
outboxPageSize = 50
)
// OutboxHandler will handle requests for the local ActivityPub outbox.
func OutboxHandler(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodGet {
w.WriteHeader(http.StatusMethodNotAllowed)
return
}
var response interface{}
var err error
if r.URL.Query().Get("page") != "" {
response, err = getOutboxPage(r.URL.Query().Get("page"), r)
} else {
response, err = getInitialOutboxHandler(r)
}
if response == nil {
w.WriteHeader(http.StatusInternalServerError)
return
}
if err != nil {
_, _ = w.Write([]byte(err.Error()))
w.WriteHeader(http.StatusInternalServerError)
return
}
pathComponents := strings.Split(r.URL.Path, "/")
accountName := pathComponents[3]
actorIRI := apmodels.MakeLocalIRIForAccount(accountName)
publicKey := crypto.GetPublicKey(actorIRI)
if err := requests.WriteStreamResponse(response.(vocab.Type), w, publicKey); err != nil {
log.Errorln("unable to write stream response for outbox handler", err)
}
}
// ActorObjectHandler will handle the request for a single ActivityPub object.
func ActorObjectHandler(w http.ResponseWriter, r *http.Request) {
object, _, _, err := persistence.GetObjectByIRI(r.URL.Path)
if err != nil {
w.WriteHeader(http.StatusNotFound)
return
// controllers.WriteSimpleResponse(w, false, err.Error())
}
if _, err := w.Write([]byte(object)); err != nil {
log.Errorln(err)
}
}
func getInitialOutboxHandler(r *http.Request) (vocab.ActivityStreamsOrderedCollection, error) {
collection := streams.NewActivityStreamsOrderedCollection()
idProperty := streams.NewJSONLDIdProperty()
id, err := createPageURL(r, nil)
if err != nil {
return nil, errors.Wrap(err, "unable to create followers page property")
}
idProperty.SetIRI(id)
collection.SetJSONLDId(idProperty)
totalPosts, err := persistence.GetOutboxPostCount()
if err != nil {
return nil, errors.Wrap(err, "unable to get outbox post count")
}
totalItemsProperty := streams.NewActivityStreamsTotalItemsProperty()
totalItemsProperty.Set(int(totalPosts))
collection.SetActivityStreamsTotalItems(totalItemsProperty)
first := streams.NewActivityStreamsFirstProperty()
page := "1"
firstIRI, err := createPageURL(r, &page)
if err != nil {
return nil, errors.Wrap(err, "unable to create first page property")
}
first.SetIRI(firstIRI)
collection.SetActivityStreamsFirst(first)
return collection, nil
}
func getOutboxPage(page string, r *http.Request) (vocab.ActivityStreamsOrderedCollectionPage, error) {
pageInt, err := strconv.Atoi(page)
if err != nil {
return nil, errors.Wrap(err, "unable to parse page number")
}
postCount, err := persistence.GetOutboxPostCount()
if err != nil {
return nil, errors.Wrap(err, "unable to get outbox post count")
}
collectionPage := streams.NewActivityStreamsOrderedCollectionPage()
idProperty := streams.NewJSONLDIdProperty()
id, err := createPageURL(r, &page)
if err != nil {
return nil, errors.Wrap(err, "unable to create followers page ID")
}
idProperty.SetIRI(id)
collectionPage.SetJSONLDId(idProperty)
orderedItems := streams.NewActivityStreamsOrderedItemsProperty()
outboxItems, err := persistence.GetOutbox(outboxPageSize, (pageInt-1)*outboxPageSize)
if err != nil {
return nil, errors.Wrap(err, "unable to get federation followers")
}
orderedItems.AppendActivityStreamsOrderedCollection(outboxItems)
collectionPage.SetActivityStreamsOrderedItems(orderedItems)
partOf := streams.NewActivityStreamsPartOfProperty()
partOfIRI, err := createPageURL(r, nil)
if err != nil {
return nil, errors.Wrap(err, "unable to create partOf property for outbox page")
}
partOf.SetIRI(partOfIRI)
collectionPage.SetActivityStreamsPartOf(partOf)
if pageInt*followersPageSize < int(postCount) {
next := streams.NewActivityStreamsNextProperty()
nextPage := fmt.Sprintf("%d", pageInt+1)
nextIRI, err := createPageURL(r, &nextPage)
if err != nil {
return nil, errors.Wrap(err, "unable to create next page property")
}
next.SetIRI(nextIRI)
collectionPage.SetActivityStreamsNext(next)
}
return collectionPage, nil
}

View File

@@ -0,0 +1,60 @@
package controllers
import (
"encoding/json"
"net/http"
"strings"
"github.com/owncast/owncast/activitypub/apmodels"
"github.com/owncast/owncast/core/data"
"github.com/owncast/owncast/utils"
log "github.com/sirupsen/logrus"
)
// WebfingerHandler will handle webfinger lookup requests.
func WebfingerHandler(w http.ResponseWriter, r *http.Request) {
if !data.GetFederationEnabled() {
w.WriteHeader(http.StatusMethodNotAllowed)
return
}
resource := r.URL.Query().Get("resource")
resourceComponents := strings.Split(resource, ":")
account := resourceComponents[1]
userComponents := strings.Split(account, "@")
if len(userComponents) < 2 {
return
}
host := userComponents[1]
user := userComponents[0]
if _, valid := data.GetFederatedInboxMap()[user]; !valid {
// User is not valid
w.WriteHeader(http.StatusNotFound)
log.Println("Webfinger request rejected")
return
}
// If the webfinger request doesn't match our server then it
// should be rejected.
instanceHostString := data.GetServerURL()
if instanceHostString == "" {
w.WriteHeader(http.StatusNotImplemented)
return
}
instanceHostString = utils.GetHostnameFromURLString(instanceHostString)
if instanceHostString == "" || instanceHostString != host {
w.WriteHeader(http.StatusNotImplemented)
return
}
webfingerResponse := apmodels.MakeWebfingerResponse(user, user, host)
w.Header().Set("Content-Type", "application/jrd+json")
if err := json.NewEncoder(w).Encode(webfingerResponse); err != nil {
log.Errorln("unable to write webfinger response", err)
}
}

View File

@@ -0,0 +1,78 @@
package crypto
import (
"crypto/rand"
"crypto/rsa"
"crypto/x509"
"encoding/pem"
"errors"
"net/url"
"github.com/owncast/owncast/core/data"
log "github.com/sirupsen/logrus"
)
// GetPublicKey will return the public key for the provided actor.
func GetPublicKey(actorIRI *url.URL) PublicKey {
key := data.GetPublicKey()
idURL, err := url.Parse(actorIRI.String() + "#main-key")
if err != nil {
log.Errorln("unable to parse actor iri string", idURL, err)
}
return PublicKey{
ID: idURL,
Owner: actorIRI,
PublicKeyPem: key,
}
}
// GetPrivateKey will return the internal server private key.
func GetPrivateKey() *rsa.PrivateKey {
key := data.GetPrivateKey()
block, _ := pem.Decode([]byte(key))
if block == nil {
log.Errorln(errors.New("failed to parse PEM block containing the key"))
return nil
}
priv, err := x509.ParsePKCS1PrivateKey(block.Bytes)
if err != nil {
log.Errorln("unable to parse private key", err)
return nil
}
return priv
}
// GenerateKeys will generate the private/public key pair needed for federation.
func GenerateKeys() ([]byte, []byte, error) {
// generate key
privatekey, err := rsa.GenerateKey(rand.Reader, 2048)
if err != nil {
log.Errorln("Cannot generate RSA key", err)
return nil, nil, err
}
publickey := &privatekey.PublicKey
privateKeyBytes := x509.MarshalPKCS1PrivateKey(privatekey)
privateKeyBlock := &pem.Block{
Type: "RSA PRIVATE KEY",
Bytes: privateKeyBytes,
}
privatePem := pem.EncodeToMemory(privateKeyBlock)
publicKeyBytes, err := x509.MarshalPKIXPublicKey(publickey)
if err != nil {
log.Errorln("error when dumping publickey:", err)
return nil, nil, err
}
publicKeyBlock := &pem.Block{
Type: "PUBLIC KEY",
Bytes: publicKeyBytes,
}
publicPem := pem.EncodeToMemory(publicKeyBlock)
return privatePem, publicPem, nil
}

View File

@@ -0,0 +1,10 @@
package crypto
import "net/url"
// PublicKey represents a public key with associated ownership.
type PublicKey struct {
ID *url.URL `json:"id"`
Owner *url.URL `json:"owner"`
PublicKeyPem string `json:"publicKeyPem"`
}

View File

@@ -0,0 +1,70 @@
package crypto
import (
"crypto"
"net/http"
"net/url"
"time"
"github.com/go-fed/httpsig"
)
// SignResponse will sign a response using the provided response body and public key.
func SignResponse(w http.ResponseWriter, body []byte, publicKey PublicKey) error {
privateKey := GetPrivateKey()
return signResponse(privateKey, *publicKey.ID, body, w)
}
func signResponse(privateKey crypto.PrivateKey, pubKeyID url.URL, body []byte, w http.ResponseWriter) error {
prefs := []httpsig.Algorithm{httpsig.RSA_SHA256}
digestAlgorithm := httpsig.DigestSha256
// The "Date" and "Digest" headers must already be set on r, as well as r.URL.
headersToSign := []string{}
if body != nil {
headersToSign = append(headersToSign, "digest")
}
signer, _, err := httpsig.NewSigner(prefs, digestAlgorithm, headersToSign, httpsig.Signature, 0)
if err != nil {
return err
}
// If r were a http.ResponseWriter, call SignResponse instead.
return signer.SignResponse(privateKey, pubKeyID.String(), w, body)
}
// SignRequest will sign an ounbound request given the provided body.
func SignRequest(req *http.Request, body []byte, actorIRI *url.URL) error {
publicKey := GetPublicKey(actorIRI)
privateKey := GetPrivateKey()
return signRequest(privateKey, publicKey.ID.String(), body, req)
}
func signRequest(privateKey crypto.PrivateKey, pubKeyID string, body []byte, r *http.Request) error {
prefs := []httpsig.Algorithm{httpsig.RSA_SHA256}
digestAlgorithm := httpsig.DigestSha256
date := time.Now().UTC().Format("Mon, 02 Jan 2006 15:04:05 GMT")
r.Header["Date"] = []string{date}
r.Header["Host"] = []string{r.URL.Host}
r.Header["Accept"] = []string{`application/ld+json; profile="https://www.w3.org/ns/activitystreams"`}
// The "Date" and "Digest" headers must already be set on r, as well as r.URL.
headersToSign := []string{httpsig.RequestTarget, "host", "date"}
if body != nil {
headersToSign = append(headersToSign, "digest")
}
signer, _, err := httpsig.NewSigner(prefs, digestAlgorithm, headersToSign, httpsig.Signature, 0)
if err != nil {
return err
}
// If r were a http.ResponseWriter, call SignResponse instead.
return signer.SignRequest(privateKey, pubKeyID, r, body)
}

View File

@@ -0,0 +1,40 @@
package inbox
import (
"context"
"time"
"github.com/go-fed/activity/streams/vocab"
"github.com/owncast/owncast/activitypub/persistence"
"github.com/owncast/owncast/core/chat/events"
"github.com/pkg/errors"
)
func handleAnnounceRequest(c context.Context, activity vocab.ActivityStreamsAnnounce) error {
object := activity.GetActivityStreamsObject()
actorReference := activity.GetActivityStreamsActor()
objectIRI := object.At(0).GetIRI().String()
actorIRI := actorReference.At(0).GetIRI().String()
if hasPreviouslyhandled, err := persistence.HasPreviouslyHandledInboundActivity(objectIRI, actorIRI, events.FediverseEngagementRepost); hasPreviouslyhandled || err != nil {
return errors.Wrap(err, "inbound activity of share/re-post has already been handled")
}
// Shares need to match a post we had already sent.
_, isLiveNotification, timestamp, err := persistence.GetObjectByIRI(objectIRI)
if err != nil {
return errors.Wrap(err, "Could not find post locally")
}
// Don't allow old activities to be liked
if time.Since(timestamp) > maxAgeForEngagement {
return errors.New("Activity is too old to be shared")
}
// Save as an accepted activity
if err := persistence.SaveInboundFediverseActivity(objectIRI, actorIRI, events.FediverseEngagementRepost, time.Now()); err != nil {
return errors.Wrap(err, "unable to save inbound share/re-post activity")
}
return handleEngagementActivity(events.FediverseEngagementRepost, isLiveNotification, actorReference, events.FediverseEngagementRepost)
}

62
activitypub/inbox/chat.go Normal file
View File

@@ -0,0 +1,62 @@
package inbox
import (
"fmt"
"github.com/go-fed/activity/streams/vocab"
"github.com/owncast/owncast/activitypub/resolvers"
"github.com/owncast/owncast/core/chat"
"github.com/owncast/owncast/core/chat/events"
"github.com/owncast/owncast/core/data"
)
func handleEngagementActivity(eventType events.EventType, isLiveNotification bool, actorReference vocab.ActivityStreamsActorProperty, action string) error {
// Do nothing if displaying engagement actions has been turned off.
if !data.GetFederationShowEngagement() {
return nil
}
// Do nothing if chat is disabled
if data.GetChatDisabled() {
return nil
}
// Get actor of the action
actor, _ := resolvers.GetResolvedActorFromActorProperty(actorReference)
// Send chat message
actorName := actor.Name
if actorName == "" {
actorName = actor.Username
}
actorIRI := actorReference.Begin().GetIRI().String()
userPrefix := fmt.Sprintf("%s ", actorName)
var suffix string
if isLiveNotification && action == events.FediverseEngagementLike {
suffix = "liked that this stream went live."
} else if action == events.FediverseEngagementLike {
suffix = fmt.Sprintf("liked a post from %s.", data.GetServerName())
} else if isLiveNotification && action == events.FediverseEngagementRepost {
suffix = "shared this stream with their followers."
} else if action == events.FediverseEngagementRepost {
suffix = fmt.Sprintf("shared a post from %s.", data.GetServerName())
} else if action == events.FediverseEngagementFollow {
suffix = "followed this stream."
} else {
return fmt.Errorf("could not handle event for sending to chat: %s", action)
}
body := fmt.Sprintf("%s %s", userPrefix, suffix)
var image *string
if actor.Image != nil {
s := actor.Image.String()
image = &s
}
if err := chat.SendFediverseAction(eventType, actor.FullUsername, image, body, actorIRI); err != nil {
return err
}
return nil
}

View File

@@ -0,0 +1,7 @@
package inbox
import "time"
const (
maxAgeForEngagement = time.Hour * 36
)

View File

@@ -0,0 +1,88 @@
package inbox
import (
"context"
"fmt"
"time"
"github.com/go-fed/activity/streams/vocab"
"github.com/owncast/owncast/activitypub/persistence"
"github.com/owncast/owncast/activitypub/requests"
"github.com/owncast/owncast/activitypub/resolvers"
"github.com/owncast/owncast/core/chat/events"
"github.com/owncast/owncast/core/data"
"github.com/pkg/errors"
log "github.com/sirupsen/logrus"
)
func handleFollowInboxRequest(c context.Context, activity vocab.ActivityStreamsFollow) error {
follow, err := resolvers.MakeFollowRequest(c, activity)
if err != nil {
log.Errorln("unable to create follow inbox request", err)
return err
}
if follow == nil {
return fmt.Errorf("unable to handle request")
}
approved := !data.GetFederationIsPrivate()
followRequest := *follow
if err := persistence.AddFollow(followRequest, approved); err != nil {
log.Errorln("unable to save follow request", err)
return err
}
localAccountName := data.GetDefaultFederationUsername()
if approved {
if err := requests.SendFollowAccept(follow.Inbox, follow.FollowRequestIri, localAccountName); err != nil {
log.Errorln("unable to send follow accept", err)
return err
}
}
// Save as an accepted activity
actorReference := activity.GetActivityStreamsActor()
object := activity.GetActivityStreamsObject()
objectIRI := object.At(0).GetIRI().String()
actorIRI := actorReference.At(0).GetIRI().String()
// If this request is approved and we have not previously sent an action to
// chat due to a previous follow request, then do so.
hasPreviouslyhandled := true // Default so we don't send anything if it fails.
if approved {
hasPreviouslyhandled, err = persistence.HasPreviouslyHandledInboundActivity(objectIRI, actorIRI, events.FediverseEngagementFollow)
if err != nil {
log.Errorln("error checking for previously handled follow activity", err)
}
}
// Save this follow action to our activities table.
if err := persistence.SaveInboundFediverseActivity(objectIRI, actorIRI, events.FediverseEngagementFollow, time.Now()); err != nil {
return errors.Wrap(err, "unable to save inbound share/re-post activity")
}
// Send action to chat if it has not been previously handled.
if !hasPreviouslyhandled {
return handleEngagementActivity(events.FediverseEngagementFollow, false, actorReference, events.FediverseEngagementFollow)
}
return nil
}
func handleUnfollowRequest(c context.Context, activity vocab.ActivityStreamsUndo) error {
request := resolvers.MakeUnFollowRequest(c, activity)
if request == nil {
log.Errorf("unable to handle unfollow request")
return errors.New("unable to handle unfollow request")
}
unfollowRequest := *request
log.Traceln("unfollow request:", unfollowRequest)
return persistence.RemoveFollow(unfollowRequest)
}

40
activitypub/inbox/like.go Normal file
View File

@@ -0,0 +1,40 @@
package inbox
import (
"context"
"time"
"github.com/go-fed/activity/streams/vocab"
"github.com/owncast/owncast/activitypub/persistence"
"github.com/owncast/owncast/core/chat/events"
"github.com/pkg/errors"
)
func handleLikeRequest(c context.Context, activity vocab.ActivityStreamsLike) error {
object := activity.GetActivityStreamsObject()
actorReference := activity.GetActivityStreamsActor()
objectIRI := object.At(0).GetIRI().String()
actorIRI := actorReference.At(0).GetIRI().String()
if hasPreviouslyhandled, err := persistence.HasPreviouslyHandledInboundActivity(objectIRI, actorIRI, events.FediverseEngagementLike); hasPreviouslyhandled || err != nil {
return errors.Wrap(err, "inbound activity of like has already been handled")
}
// Likes need to match a post we had already sent.
_, isLiveNotification, timestamp, err := persistence.GetObjectByIRI(objectIRI)
if err != nil {
return errors.Wrap(err, "Could not find post locally")
}
// Don't allow old activities to be liked
if time.Since(timestamp) > maxAgeForEngagement {
return errors.New("Activity is too old to be liked")
}
// Save as an accepted activity
if err := persistence.SaveInboundFediverseActivity(objectIRI, actorIRI, events.FediverseEngagementLike, time.Now()); err != nil {
return errors.Wrap(err, "unable to save inbound like activity")
}
return handleEngagementActivity(events.FediverseEngagementLike, isLiveNotification, actorReference, events.FediverseEngagementLike)
}

27
activitypub/inbox/undo.go Normal file
View File

@@ -0,0 +1,27 @@
package inbox
import (
"context"
log "github.com/sirupsen/logrus"
"github.com/go-fed/activity/streams/vocab"
)
func handleUndoInboxRequest(c context.Context, activity vocab.ActivityStreamsUndo) error {
// Determine if this is an undo of a follow, favorite, announce, etc.
o := activity.GetActivityStreamsObject()
for iter := o.Begin(); iter != o.End(); iter = iter.Next() {
if iter.IsActivityStreamsFollow() {
// This is an Unfollow request
if err := handleUnfollowRequest(c, activity); err != nil {
return err
}
} else {
log.Traceln("Undo", iter.GetType().GetTypeName(), "ignored")
return nil
}
}
return nil
}

View File

@@ -0,0 +1,25 @@
package inbox
import (
"context"
"github.com/go-fed/activity/streams/vocab"
"github.com/owncast/owncast/activitypub/persistence"
"github.com/owncast/owncast/activitypub/resolvers"
log "github.com/sirupsen/logrus"
)
func handleUpdateRequest(c context.Context, activity vocab.ActivityStreamsUpdate) error {
// We only care about update events to followers.
if !activity.GetActivityStreamsObject().At(0).IsActivityStreamsPerson() {
return nil
}
actor, err := resolvers.GetResolvedActorFromActorProperty(activity.GetActivityStreamsActor())
if err != nil {
log.Errorln(err)
return err
}
return persistence.UpdateFollower(actor.ActorIri.String(), actor.Inbox.String(), actor.Name, actor.FullUsername, actor.Image.String())
}

130
activitypub/inbox/worker.go Normal file
View File

@@ -0,0 +1,130 @@
package inbox
import (
"context"
"crypto/x509"
"encoding/pem"
"net/http"
"net/url"
"strings"
"github.com/pkg/errors"
"github.com/go-fed/httpsig"
"github.com/owncast/owncast/activitypub/apmodels"
"github.com/owncast/owncast/activitypub/persistence"
"github.com/owncast/owncast/activitypub/resolvers"
"github.com/owncast/owncast/core/data"
log "github.com/sirupsen/logrus"
)
func handle(request apmodels.InboxRequest) {
if verified, err := Verify(request.Request); err != nil {
log.Debugln("Error in attempting to verify request", err)
return
} else if !verified {
log.Errorln("Request failed verification", err)
return
}
// c := context.WithValue(context.Background(), "account", request.ForLocalAccount) //nolint
if err := resolvers.Resolve(context.Background(), request.Body, handleUpdateRequest, handleFollowInboxRequest, handleLikeRequest, handleAnnounceRequest, handleUndoInboxRequest); err != nil {
log.Errorln("resolver error:", err)
}
}
// Verify will Verify the http signature of an inbound request as well as
// check it against the list of blocked domains.
func Verify(request *http.Request) (bool, error) {
verifier, err := httpsig.NewVerifier(request)
if err != nil {
return false, errors.Wrap(err, "failed to create key verifier for request")
}
pubKeyID, err := url.Parse(verifier.KeyId())
if err != nil {
return false, errors.Wrap(err, "failed to parse key to get key ID")
}
// Force federation only via servers using https.
if pubKeyID.Scheme != "https" {
return false, errors.New("federated servers must use https: " + pubKeyID.String())
}
signature := request.Header.Get("signature")
var algorithmString string
signatureComponents := strings.Split(signature, ",")
for _, component := range signatureComponents {
kv := strings.Split(component, "=")
if kv[0] == "algorithm" {
algorithmString = kv[1]
break
}
}
algorithmString = strings.Trim(algorithmString, "\"")
if algorithmString == "" {
return false, errors.New("Unable to determine algorithm to verify request")
}
actor, err := resolvers.GetResolvedActorFromIRI(pubKeyID.String())
if err != nil {
return false, errors.Wrap(err, "failed to resolve actor from IRI to fetch key")
}
// Test to see if the actor is in the list of blocked federated domains.
if isBlockedDomain(actor.ActorIri.Hostname()) {
return false, errors.New("domain is blocked")
}
// If actor is specifically blocked, then fail validation.
if blocked, err := isBlockedActor(actor.ActorIri); err != nil || blocked {
return false, err
}
key := actor.W3IDSecurityV1PublicKey.Begin().Get().GetW3IDSecurityV1PublicKeyPem().Get()
block, _ := pem.Decode([]byte(key))
if block == nil {
log.Errorln("failed to parse PEM block containing the public key")
return false, errors.New("failed to parse PEM block containing the public key")
}
parsedKey, err := x509.ParsePKIXPublicKey(block.Bytes)
if err != nil {
log.Errorln("failed to parse DER encoded public key: " + err.Error())
return false, errors.Wrap(err, "failed to parse DER encoded public key")
}
var algorithm httpsig.Algorithm = httpsig.Algorithm(algorithmString)
// The verifier will verify the Digest in addition to the HTTP signature
if err := verifier.Verify(parsedKey, algorithm); err != nil {
log.Warnln("verification error for", pubKeyID, err)
return false, errors.Wrap(err, "verification error: "+pubKeyID.String())
}
return true, nil
}
func isBlockedDomain(domain string) bool {
blockedDomains := data.GetBlockedFederatedDomains()
for _, blockedDomain := range blockedDomains {
if strings.Contains(domain, blockedDomain) {
return true
}
}
return false
}
func isBlockedActor(actorIRI *url.URL) (bool, error) {
blockedactor, err := persistence.GetFollower(actorIRI.String())
if blockedactor != nil && blockedactor.DisabledAt != nil {
return true, errors.Wrap(err, "remote actor is blocked")
}
return false, nil
}

View File

@@ -0,0 +1,100 @@
package inbox
import (
"net/url"
"testing"
"github.com/go-fed/activity/streams"
"github.com/go-fed/activity/streams/vocab"
"github.com/owncast/owncast/activitypub/apmodels"
"github.com/owncast/owncast/activitypub/persistence"
"github.com/owncast/owncast/core/data"
)
func makeFakePerson() vocab.ActivityStreamsPerson {
iri, _ := url.Parse("https://freedom.eagle/user/mrfoo")
name := "Mr Foo"
username := "foodawg"
inbox, _ := url.Parse("https://fake.fediverse.server/user/mrfoo/inbox")
userAvatarURL, _ := url.Parse("https://fake.fediverse.server/user/mrfoo/avatar.png")
person := streams.NewActivityStreamsPerson()
id := streams.NewJSONLDIdProperty()
id.Set(iri)
person.SetJSONLDId(id)
nameProperty := streams.NewActivityStreamsNameProperty()
nameProperty.AppendXMLSchemaString(name)
person.SetActivityStreamsName(nameProperty)
preferredUsernameProperty := streams.NewActivityStreamsPreferredUsernameProperty()
preferredUsernameProperty.SetXMLSchemaString(username)
person.SetActivityStreamsPreferredUsername(preferredUsernameProperty)
inboxProp := streams.NewActivityStreamsInboxProperty()
inboxProp.SetIRI(inbox)
person.SetActivityStreamsInbox(inboxProp)
image := streams.NewActivityStreamsImage()
imgProp := streams.NewActivityStreamsUrlProperty()
imgProp.AppendIRI(userAvatarURL)
image.SetActivityStreamsUrl(imgProp)
icon := streams.NewActivityStreamsIconProperty()
icon.AppendActivityStreamsImage(image)
person.SetActivityStreamsIcon(icon)
return person
}
func TestMain(m *testing.M) {
data.SetupPersistence(":memory:")
data.SetServerURL("https://my.cool.site.biz")
persistence.Setup(data.GetDatastore())
m.Run()
}
func TestBlockedDomains(t *testing.T) {
person := makeFakePerson()
data.SetBlockedFederatedDomains([]string{"freedom.eagle", "guns.life"})
if len(data.GetBlockedFederatedDomains()) != 2 {
t.Error("Blocked federated domains is not set correctly")
}
for _, domain := range data.GetBlockedFederatedDomains() {
if domain == person.GetJSONLDId().GetIRI().Host {
return
}
}
t.Error("Failed to catch blocked domain")
}
func TestBlockedActors(t *testing.T) {
person := makeFakePerson()
persistence.AddFollow(apmodels.ActivityPubActor{
ActorIri: person.GetJSONLDId().GetIRI(),
Inbox: person.GetJSONLDId().GetIRI(),
FollowRequestIri: person.GetJSONLDId().GetIRI(),
}, false)
persistence.BlockOrRejectFollower(person.GetJSONLDId().GetIRI().String())
blocked, err := isBlockedActor(person.GetJSONLDId().GetIRI())
if err != nil {
t.Error(err)
return
}
if !blocked {
t.Error("Failed to block actor")
}
failedBlockIRI, _ := url.Parse("https://freedom.eagle/user/mrbar")
failedBlock, err := isBlockedActor(failedBlockIRI)
if failedBlock {
t.Error("Invalid blocking of unblocked actor IRI")
}
}

View File

@@ -0,0 +1,44 @@
package inbox
import (
"github.com/owncast/owncast/activitypub/apmodels"
log "github.com/sirupsen/logrus"
)
const (
// InboxWorkerPoolSize defines the number of concurrent ActivityPub handlers.
InboxWorkerPoolSize = 10
)
// Job struct bundling the ActivityPub and the payload in one struct.
type Job struct {
request apmodels.InboxRequest
}
var queue chan Job
// InitInboxWorkerPool starts n go routines that await ActivityPub jobs.
func InitInboxWorkerPool() {
queue = make(chan Job)
// start workers
for i := 1; i <= InboxWorkerPoolSize; i++ {
go worker(i, queue)
}
}
// AddToQueue will queue up an outbound http request.
func AddToQueue(req apmodels.InboxRequest) {
log.Tracef("Queued request for ActivityPub inbox handler")
queue <- Job{req}
}
func worker(workerID int, queue <-chan Job) {
log.Debugf("Started ActivityPub worker %d", workerID)
for job := range queue {
handle(job.request)
log.Tracef("Done with ActivityPub inbox handler using worker %d", workerID)
}
}

View File

@@ -0,0 +1,245 @@
package outbox
import (
"errors"
"fmt"
"net/url"
"path/filepath"
"regexp"
"strings"
"github.com/go-fed/activity/streams"
"github.com/go-fed/activity/streams/vocab"
"github.com/owncast/owncast/activitypub/apmodels"
"github.com/owncast/owncast/activitypub/persistence"
"github.com/owncast/owncast/activitypub/requests"
"github.com/owncast/owncast/activitypub/workerpool"
"github.com/owncast/owncast/config"
"github.com/owncast/owncast/core/data"
"github.com/owncast/owncast/utils"
log "github.com/sirupsen/logrus"
"github.com/teris-io/shortid"
)
// SendLive will send all followers the message saying you started a live stream.
func SendLive() error {
textContent := data.GetFederationGoLiveMessage()
// If the message is empty then do not send it.
if textContent == "" {
return nil
}
tagStrings := []string{}
reg := regexp.MustCompile("[^a-zA-Z0-9]+")
tagProp := streams.NewActivityStreamsTagProperty()
for _, tagString := range data.GetServerMetadataTags() {
tagWithoutSpecialCharacters := reg.ReplaceAllString(tagString, "")
hashtag := apmodels.MakeHashtag(tagWithoutSpecialCharacters)
tagProp.AppendTootHashtag(hashtag)
tagString := getHashtagLinkHTMLFromTagString(tagWithoutSpecialCharacters)
tagStrings = append(tagStrings, tagString)
}
// Manually add Owncast hashtag if it doesn't already exist so it shows up
// in Owncast search results.
// We can remove this down the road, but it'll be nice for now.
if _, exists := utils.FindInSlice(tagStrings, "owncast"); !exists {
hashtag := apmodels.MakeHashtag("owncast")
tagProp.AppendTootHashtag(hashtag)
}
tagsString := strings.Join(tagStrings, " ")
var streamTitle string
if title := data.GetStreamTitle(); title != "" {
streamTitle = fmt.Sprintf("<p>%s</p>", title)
}
textContent = fmt.Sprintf("<p>%s</p><p>%s</p><p>%s</p><a href=\"%s\">%s</a>", textContent, streamTitle, tagsString, data.GetServerURL(), data.GetServerURL())
activity, _, note, noteID := createBaseOutboundMessage(textContent)
note.SetActivityStreamsTag(tagProp)
// Attach an image along with the Federated message.
previewURL, err := url.Parse(data.GetServerURL())
if err == nil {
var imageToAttach string
previewGif := filepath.Join(config.WebRoot, "preview.gif")
thumbnailJpg := filepath.Join(config.WebRoot, "thumbnail.jpg")
if utils.DoesFileExists(previewGif) {
imageToAttach = "preview.gif"
} else if utils.DoesFileExists(thumbnailJpg) {
imageToAttach = "thumbnail.jpg"
}
if imageToAttach != "" {
previewURL.Path = imageToAttach
apmodels.AddImageAttachmentToNote(note, previewURL.String())
}
}
if data.GetNSFW() {
// Mark content as sensitive.
sensitive := streams.NewActivityStreamsSensitiveProperty()
sensitive.AppendXMLSchemaBoolean(true)
note.SetActivityStreamsSensitive(sensitive)
}
b, err := apmodels.Serialize(activity)
if err != nil {
log.Errorln("unable to serialize go live message activity", err)
return errors.New("unable to serialize go live message activity " + err.Error())
}
if err := SendToFollowers(b); err != nil {
return err
}
if err := Add(note, noteID, true); err != nil {
return err
}
return nil
}
// SendPublicMessage will send a public message to all followers.
func SendPublicMessage(textContent string) error {
originalContent := textContent
textContent = utils.RenderSimpleMarkdown(textContent)
tagProp := streams.NewActivityStreamsTagProperty()
// Iterate through the post text and find #Hashtags.
words := strings.Split(originalContent, " ")
for _, word := range words {
if strings.HasPrefix(word, "#") {
tagWithoutHashtag := strings.TrimPrefix(word, "#")
// Replace the instances of the tag with a link to the tag page.
tagHTML := getHashtagLinkHTMLFromTagString(tagWithoutHashtag)
textContent = strings.ReplaceAll(textContent, word, tagHTML)
// Create Hashtag object for the tag.
hashtag := apmodels.MakeHashtag(tagWithoutHashtag)
tagProp.AppendTootHashtag(hashtag)
}
}
activity, _, note, noteID := createBaseOutboundMessage(textContent)
note.SetActivityStreamsTag(tagProp)
b, err := apmodels.Serialize(activity)
if err != nil {
log.Errorln("unable to serialize custom fediverse message activity", err)
return errors.New("unable to serialize custom fediverse message activity " + err.Error())
}
if err := SendToFollowers(b); err != nil {
return err
}
if err := Add(note, noteID, false); err != nil {
return err
}
return nil
}
// nolint: unparam
func createBaseOutboundMessage(textContent string) (vocab.ActivityStreamsCreate, string, vocab.ActivityStreamsNote, string) {
localActor := apmodels.MakeLocalIRIForAccount(data.GetDefaultFederationUsername())
noteID := shortid.MustGenerate()
noteIRI := apmodels.MakeLocalIRIForResource(noteID)
id := shortid.MustGenerate()
activity := apmodels.CreateCreateActivity(id, localActor)
object := streams.NewActivityStreamsObjectProperty()
activity.SetActivityStreamsObject(object)
note := apmodels.MakeNote(textContent, noteIRI, localActor)
object.AppendActivityStreamsNote(note)
return activity, id, note, noteID
}
// Get Hashtag HTML link for a given tag (without # prefix).
func getHashtagLinkHTMLFromTagString(baseHashtag string) string {
return fmt.Sprintf("<a class=\"hashtag\" href=\"https://directory.owncast.online/tags/%s\">#%s</a>", baseHashtag, baseHashtag)
}
// SendToFollowers will send an arbitrary payload to all follower inboxes.
func SendToFollowers(payload []byte) error {
localActor := apmodels.MakeLocalIRIForAccount(data.GetDefaultFederationUsername())
followers, err := persistence.GetFederationFollowers(-1, 0)
if err != nil {
log.Errorln("unable to fetch followers to send to", err)
return errors.New("unable to fetch followers to send payload to")
}
for _, follower := range followers {
inbox, _ := url.Parse(follower.Inbox)
req, err := requests.CreateSignedRequest(payload, inbox, localActor)
if err != nil {
log.Errorln("unable to create outbox request", follower.Inbox, err)
return errors.New("unable to create outbox request: " + follower.Inbox)
}
workerpool.AddToOutboundQueue(req)
}
return nil
}
// UpdateFollowersWithAccountUpdates will send an update to all followers alerting of a profile update.
func UpdateFollowersWithAccountUpdates() error {
// Don't do anything if federation is disabled.
if !data.GetFederationEnabled() {
return nil
}
id := shortid.MustGenerate()
objectID := apmodels.MakeLocalIRIForResource(id)
activity := apmodels.MakeUpdateActivity(objectID)
actor := streams.NewActivityStreamsPerson()
actorID := apmodels.MakeLocalIRIForAccount(data.GetDefaultFederationUsername())
actorIDProperty := streams.NewJSONLDIdProperty()
actorIDProperty.Set(actorID)
actor.SetJSONLDId(actorIDProperty)
actorProperty := streams.NewActivityStreamsActorProperty()
actorProperty.AppendActivityStreamsPerson(actor)
activity.SetActivityStreamsActor(actorProperty)
obj := streams.NewActivityStreamsObjectProperty()
obj.AppendIRI(actorID)
activity.SetActivityStreamsObject(obj)
b, err := apmodels.Serialize(activity)
if err != nil {
log.Errorln("unable to serialize send update actor activity", err)
return errors.New("unable to serialize send update actor activity")
}
return SendToFollowers(b)
}
// Add will save an ActivityPub object to the datastore.
func Add(item vocab.Type, id string, isLiveNotification bool) error {
iri := item.GetJSONLDId().GetIRI().String()
typeString := item.GetTypeName()
if iri == "" {
log.Errorln("Unable to get iri from item")
return errors.New("Unable to get iri from item " + id)
}
b, err := apmodels.Serialize(item)
if err != nil {
log.Errorln("unable to serialize model when saving to outbox", err)
return err
}
return persistence.AddToOutbox(iri, b, typeString, isLiveNotification)
}

View File

@@ -0,0 +1,121 @@
package persistence
import (
"context"
"github.com/owncast/owncast/db"
"github.com/owncast/owncast/models"
"github.com/owncast/owncast/utils"
log "github.com/sirupsen/logrus"
)
func createFederationFollowersTable() {
log.Traceln("Creating federation followers table...")
createTableSQL := `CREATE TABLE IF NOT EXISTS ap_followers (
"iri" TEXT NOT NULL,
"inbox" TEXT NOT NULL,
"name" TEXT,
"username" TEXT NOT NULL,
"image" TEXT,
"request" TEXT NOT NULL,
"created_at" TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
"approved_at" TIMESTAMP,
"disabled_at" TIMESTAMP,
PRIMARY KEY (iri));
CREATE INDEX iri_index ON ap_followers (iri);
CREATE INDEX approved_at_index ON ap_followers (approved_at);`
stmt, err := _datastore.DB.Prepare(createTableSQL)
if err != nil {
log.Fatal(err)
}
defer stmt.Close()
_, err = stmt.Exec()
if err != nil {
log.Warnln("error executing sql creating followers table", createTableSQL, err)
}
}
// GetFollowerCount will return the number of followers we're keeping track of.
func GetFollowerCount() (int64, error) {
ctx := context.Background()
return _datastore.GetQueries().GetFollowerCount(ctx)
}
// GetFederationFollowers will return a slice of the followers we keep track of locally.
func GetFederationFollowers(limit int, offset int) ([]models.Follower, error) {
ctx := context.Background()
followersResult, err := _datastore.GetQueries().GetFederationFollowersWithOffset(ctx, db.GetFederationFollowersWithOffsetParams{
Limit: int32(limit),
Offset: int32(offset),
})
if err != nil {
return nil, err
}
followers := make([]models.Follower, 0)
for _, row := range followersResult {
singleFollower := models.Follower{
Name: row.Name.String,
Username: row.Username,
Image: row.Image.String,
ActorIRI: row.Iri,
Inbox: row.Inbox,
Timestamp: utils.NullTime(row.CreatedAt),
}
followers = append(followers, singleFollower)
}
return followers, nil
}
// GetPendingFollowRequests will return pending follow requests.
func GetPendingFollowRequests() ([]models.Follower, error) {
pendingFollowersResult, err := _datastore.GetQueries().GetFederationFollowerApprovalRequests(context.Background())
if err != nil {
return nil, err
}
followers := make([]models.Follower, 0)
for _, row := range pendingFollowersResult {
singleFollower := models.Follower{
Name: row.Name.String,
Username: row.Username,
Image: row.Image.String,
ActorIRI: row.Iri,
Inbox: row.Inbox,
Timestamp: utils.NullTime{Time: row.CreatedAt.Time, Valid: true},
}
followers = append(followers, singleFollower)
}
return followers, nil
}
// GetBlockedAndRejectedFollowers will return blocked and rejected followers.
func GetBlockedAndRejectedFollowers() ([]models.Follower, error) {
pendingFollowersResult, err := _datastore.GetQueries().GetRejectedAndBlockedFollowers(context.Background())
if err != nil {
return nil, err
}
followers := make([]models.Follower, 0)
for _, row := range pendingFollowersResult {
singleFollower := models.Follower{
Name: row.Name.String,
Username: row.Username,
Image: row.Image.String,
ActorIRI: row.Iri,
DisabledAt: utils.NullTime{Time: row.DisabledAt.Time, Valid: true},
Timestamp: utils.NullTime{Time: row.CreatedAt.Time, Valid: true},
}
followers = append(followers, singleFollower)
}
return followers, nil
}

View File

@@ -0,0 +1,360 @@
package persistence
import (
"context"
"database/sql"
"fmt"
"net/url"
"time"
"github.com/go-fed/activity/streams"
"github.com/go-fed/activity/streams/vocab"
"github.com/owncast/owncast/activitypub/apmodels"
"github.com/owncast/owncast/activitypub/resolvers"
"github.com/owncast/owncast/core/data"
"github.com/owncast/owncast/db"
"github.com/owncast/owncast/models"
"github.com/pkg/errors"
log "github.com/sirupsen/logrus"
)
var _datastore *data.Datastore
// Setup will initialize the ActivityPub persistence layer with the provided datastore.
func Setup(datastore *data.Datastore) {
_datastore = datastore
createFederationFollowersTable()
createFederationOutboxTable()
createFederatedActivitiesTable()
}
// AddFollow will save a follow to the datastore.
func AddFollow(follow apmodels.ActivityPubActor, approved bool) error {
log.Traceln("Saving", follow.ActorIri, "as a follower.")
var image string
if follow.Image != nil {
image = follow.Image.String()
}
return createFollow(follow.ActorIri.String(), follow.Inbox.String(), follow.FollowRequestIri.String(), follow.Name, follow.Username, image, approved)
}
// RemoveFollow will remove a follow from the datastore.
func RemoveFollow(unfollow apmodels.ActivityPubActor) error {
log.Traceln("Removing", unfollow.ActorIri, "as a follower.")
return removeFollow(unfollow.ActorIri)
}
// GetFollower will return a single follower/request given an IRI.
func GetFollower(iri string) (*apmodels.ActivityPubActor, error) {
result, err := _datastore.GetQueries().GetFollowerByIRI(context.Background(), iri)
if err != nil {
return nil, err
}
followIRI, err := url.Parse(result.Request)
if err != nil {
return nil, errors.Wrap(err, "error parsing follow request IRI")
}
iriURL, err := url.Parse(result.Iri)
if err != nil {
return nil, errors.Wrap(err, "error parsing actor IRI")
}
inbox, err := url.Parse(result.Inbox)
if err != nil {
return nil, errors.Wrap(err, "error parsing acting inbox")
}
image, _ := url.Parse(result.Image.String)
var disabledAt *time.Time
if result.DisabledAt.Valid {
disabledAt = &result.DisabledAt.Time
}
follower := apmodels.ActivityPubActor{
ActorIri: iriURL,
Inbox: inbox,
Name: result.Name.String,
Username: result.Username,
Image: image,
FollowRequestIri: followIRI,
DisabledAt: disabledAt,
}
return &follower, nil
}
// ApprovePreviousFollowRequest will approve a follow request.
func ApprovePreviousFollowRequest(iri string) error {
return _datastore.GetQueries().ApproveFederationFollower(context.Background(), db.ApproveFederationFollowerParams{
Iri: iri,
ApprovedAt: sql.NullTime{
Time: time.Now(),
Valid: true,
},
})
}
// BlockOrRejectFollower will block an existing follower or reject a follow request.
func BlockOrRejectFollower(iri string) error {
return _datastore.GetQueries().RejectFederationFollower(context.Background(), db.RejectFederationFollowerParams{
Iri: iri,
DisabledAt: sql.NullTime{
Time: time.Now(),
Valid: true,
},
})
}
func createFollow(actor string, inbox string, request string, name string, username string, image string, approved bool) error {
tx, err := _datastore.DB.Begin()
if err != nil {
log.Debugln(err)
}
defer func() {
_ = tx.Rollback()
}()
var approvedAt sql.NullTime
if approved {
approvedAt = sql.NullTime{
Time: time.Now(),
Valid: true,
}
}
if err = _datastore.GetQueries().WithTx(tx).AddFollower(context.Background(), db.AddFollowerParams{
Iri: actor,
Inbox: inbox,
Name: sql.NullString{String: name, Valid: true},
Username: username,
Image: sql.NullString{String: image, Valid: true},
ApprovedAt: approvedAt,
Request: request,
}); err != nil {
log.Errorln("error creating new federation follow: ", err)
}
return tx.Commit()
}
// UpdateFollower will update the details of a stored follower given an IRI.
func UpdateFollower(actorIRI string, inbox string, name string, username string, image string) error {
_datastore.DbLock.Lock()
defer _datastore.DbLock.Unlock()
tx, err := _datastore.DB.Begin()
if err != nil {
log.Debugln(err)
}
defer func() {
_ = tx.Rollback()
}()
if err = _datastore.GetQueries().WithTx(tx).UpdateFollowerByIRI(context.Background(), db.UpdateFollowerByIRIParams{
Inbox: inbox,
Name: sql.NullString{String: name, Valid: true},
Username: username,
Image: sql.NullString{String: image, Valid: true},
Iri: actorIRI,
}); err != nil {
return fmt.Errorf("error updating follower %s %s", actorIRI, err)
}
return tx.Commit()
}
func removeFollow(actor *url.URL) error {
_datastore.DbLock.Lock()
defer _datastore.DbLock.Unlock()
tx, err := _datastore.DB.Begin()
if err != nil {
return err
}
defer func() {
_ = tx.Rollback()
}()
if err := _datastore.GetQueries().WithTx(tx).RemoveFollowerByIRI(context.Background(), actor.String()); err != nil {
return err
}
return tx.Commit()
}
// createFederatedActivitiesTable will create the accepted
// activities table if needed.
func createFederatedActivitiesTable() {
createTableSQL := `CREATE TABLE IF NOT EXISTS ap_accepted_activities (
"id" INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT,
"iri" TEXT NOT NULL,
"actor" TEXT NOT NULL,
"type" TEXT NOT NULL,
"timestamp" TIMESTAMP NOT NULL
);
CREATE INDEX iri_actor_index ON ap_accepted_activities (iri,actor);`
stmt, err := _datastore.DB.Prepare(createTableSQL)
if err != nil {
log.Fatal("error creating inbox table", err)
}
defer stmt.Close()
if _, err := stmt.Exec(); err != nil {
log.Fatal("error creating inbound federated activities table", err)
}
}
func createFederationOutboxTable() {
log.Traceln("Creating federation outbox table...")
createTableSQL := `CREATE TABLE IF NOT EXISTS ap_outbox (
"iri" TEXT NOT NULL,
"value" BLOB,
"type" TEXT NOT NULL,
"created_at" TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
"live_notification" BOOLEAN DEFAULT FALSE,
PRIMARY KEY (iri));
CREATE INDEX iri ON ap_outbox (iri);
CREATE INDEX type ON ap_outbox (type);
CREATE INDEX live_notification ON ap_outbox (live_notification);`
stmt, err := _datastore.DB.Prepare(createTableSQL)
if err != nil {
log.Fatal(err)
}
defer stmt.Close()
_, err = stmt.Exec()
if err != nil {
log.Warnln("error executing sql creating outbox table", createTableSQL, err)
}
}
// GetOutboxPostCount will return the number of posts in the outbox.
func GetOutboxPostCount() (int64, error) {
ctx := context.Background()
return _datastore.GetQueries().GetLocalPostCount(ctx)
}
// GetOutbox will return an instance of the outbox populated by stored items.
func GetOutbox(limit int, offset int) (vocab.ActivityStreamsOrderedCollection, error) {
collection := streams.NewActivityStreamsOrderedCollection()
orderedItems := streams.NewActivityStreamsOrderedItemsProperty()
rows, err := _datastore.GetQueries().GetOutboxWithOffset(
context.Background(),
db.GetOutboxWithOffsetParams{Limit: int32(limit), Offset: int32(offset)},
)
if err != nil {
return collection, err
}
for _, value := range rows {
createCallback := func(c context.Context, activity vocab.ActivityStreamsCreate) error {
orderedItems.AppendActivityStreamsCreate(activity)
return nil
}
if err := resolvers.Resolve(context.Background(), value, createCallback); err != nil {
return collection, err
}
}
return collection, nil
}
// AddToOutbox will store a single payload to the persistence layer.
func AddToOutbox(iri string, itemData []byte, typeString string, isLiveNotification bool) error {
tx, err := _datastore.DB.Begin()
if err != nil {
log.Debugln(err)
}
defer func() {
_ = tx.Rollback()
}()
if err = _datastore.GetQueries().WithTx(tx).AddToOutbox(context.Background(), db.AddToOutboxParams{
Iri: iri,
Value: itemData,
Type: typeString,
LiveNotification: sql.NullBool{Bool: isLiveNotification, Valid: true},
}); err != nil {
return fmt.Errorf("error creating new item in federation outbox %s", err)
}
return tx.Commit()
}
// GetObjectByID will return a string representation of a single object by the ID.
func GetObjectByID(id string) (string, error) {
value, err := _datastore.GetQueries().GetObjectFromOutboxByID(context.Background(), id)
return string(value), err
}
// GetObjectByIRI will return a string representation of a single object by the IRI.
func GetObjectByIRI(iri string) (string, bool, time.Time, error) {
row, err := _datastore.GetQueries().GetObjectFromOutboxByIRI(context.Background(), iri)
return string(row.Value), row.LiveNotification.Bool, row.CreatedAt.Time, err
}
// GetLocalPostCount will return the number of posts existing locally.
func GetLocalPostCount() (int64, error) {
ctx := context.Background()
return _datastore.GetQueries().GetLocalPostCount(ctx)
}
// SaveInboundFediverseActivity will save an event to the ap_inbound_activities table.
func SaveInboundFediverseActivity(objectIRI string, actorIRI string, eventType string, timestamp time.Time) error {
if err := _datastore.GetQueries().AddToAcceptedActivities(context.Background(), db.AddToAcceptedActivitiesParams{
Iri: objectIRI,
Actor: actorIRI,
Type: eventType,
Timestamp: timestamp,
}); err != nil {
return errors.Wrap(err, "error saving event "+objectIRI)
}
return nil
}
// GetInboundActivities will return a collection of saved, federated activities
// limited and offset by the values provided to support pagination.
func GetInboundActivities(limit int, offset int) ([]models.FederatedActivity, error) {
ctx := context.Background()
rows, err := _datastore.GetQueries().GetInboundActivitiesWithOffset(ctx, db.GetInboundActivitiesWithOffsetParams{
Limit: int32(limit),
Offset: int32(offset),
})
if err != nil {
return nil, err
}
activities := make([]models.FederatedActivity, 0)
for _, row := range rows {
singleActivity := models.FederatedActivity{
IRI: row.Iri,
ActorIRI: row.Actor,
Type: row.Type,
Timestamp: row.Timestamp,
}
activities = append(activities, singleActivity)
}
return activities, nil
}
// HasPreviouslyHandledInboundActivity will return if we have previously handled
// an inbound federated activity.
func HasPreviouslyHandledInboundActivity(iri string, actorIRI string, eventType string) (bool, error) {
exists, err := _datastore.GetQueries().DoesInboundActivityExist(context.Background(), db.DoesInboundActivityExistParams{
Iri: iri,
Actor: actorIRI,
Type: eventType,
})
if err != nil {
return false, err
}
return exists > 0, nil
}

View File

@@ -0,0 +1,51 @@
package requests
import (
"encoding/json"
"net/url"
"github.com/go-fed/activity/streams"
"github.com/go-fed/activity/streams/vocab"
"github.com/owncast/owncast/activitypub/apmodels"
"github.com/owncast/owncast/activitypub/workerpool"
"github.com/teris-io/shortid"
)
// SendFollowAccept will send an accept activity to a follow request from a specified local user.
func SendFollowAccept(inbox *url.URL, followRequestIRI *url.URL, fromLocalAccountName string) error {
followAccept := makeAcceptFollow(followRequestIRI, fromLocalAccountName)
localAccountIRI := apmodels.MakeLocalIRIForAccount(fromLocalAccountName)
var jsonmap map[string]interface{}
jsonmap, _ = streams.Serialize(followAccept)
b, _ := json.Marshal(jsonmap)
req, err := CreateSignedRequest(b, inbox, localAccountIRI)
if err != nil {
return err
}
workerpool.AddToOutboundQueue(req)
return nil
}
func makeAcceptFollow(followRequestIri *url.URL, fromAccountName string) vocab.ActivityStreamsAccept {
acceptIDString := shortid.MustGenerate()
acceptID := apmodels.MakeLocalIRIForResource(acceptIDString)
actorID := apmodels.MakeLocalIRIForAccount(fromAccountName)
accept := streams.NewActivityStreamsAccept()
idProperty := streams.NewJSONLDIdProperty()
idProperty.SetIRI(acceptID)
accept.SetJSONLDId(idProperty)
actor := apmodels.MakeActorPropertyWithID(actorID)
accept.SetActivityStreamsActor(actor)
object := streams.NewActivityStreamsObjectProperty()
object.AppendIRI(followRequestIri)
accept.SetActivityStreamsObject(object)
return accept
}

View File

@@ -0,0 +1,75 @@
package requests
import (
"bytes"
"encoding/json"
"fmt"
"net/http"
"net/url"
"github.com/go-fed/activity/streams"
"github.com/go-fed/activity/streams/vocab"
"github.com/owncast/owncast/activitypub/crypto"
"github.com/owncast/owncast/config"
log "github.com/sirupsen/logrus"
)
// WriteStreamResponse will write a ActivityPub object to the provided ResponseWriter and sign with the provided key.
func WriteStreamResponse(item vocab.Type, w http.ResponseWriter, publicKey crypto.PublicKey) error {
var jsonmap map[string]interface{}
jsonmap, _ = streams.Serialize(item)
b, err := json.Marshal(jsonmap)
if err != nil {
return err
}
return WriteResponse(b, w, publicKey)
}
// WritePayloadResponse will write any arbitrary object to the provided ResponseWriter and sign with the provided key.
func WritePayloadResponse(payload interface{}, w http.ResponseWriter, publicKey crypto.PublicKey) error {
b, err := json.Marshal(payload)
if err != nil {
return err
}
return WriteResponse(b, w, publicKey)
}
// WriteResponse will write any arbitrary payload to the provided ResponseWriter and sign with the provided key.
func WriteResponse(payload []byte, w http.ResponseWriter, publicKey crypto.PublicKey) error {
w.Header().Set("Content-Type", "application/activity+json")
if err := crypto.SignResponse(w, payload, publicKey); err != nil {
w.WriteHeader(http.StatusInternalServerError)
log.Errorln("unable to sign response", err)
return err
}
if _, err := w.Write(payload); err != nil {
w.WriteHeader(http.StatusInternalServerError)
return err
}
return nil
}
// CreateSignedRequest will create a signed POST request of a payload to the provided destination.
func CreateSignedRequest(payload []byte, url *url.URL, fromActorIRI *url.URL) (*http.Request, error) {
log.Debugln("Sending", string(payload), "to", url)
req, _ := http.NewRequest("POST", url.String(), bytes.NewBuffer(payload))
ua := fmt.Sprintf("%s; https://owncast.online", config.GetReleaseString())
req.Header.Set("User-Agent", ua)
req.Header.Set("Content-Type", "application/activity+json")
if err := crypto.SignRequest(req, payload, fromActorIRI); err != nil {
log.Errorln("error signing request:", err)
return nil, err
}
return req, nil
}

View File

@@ -0,0 +1,57 @@
package resolvers
import (
"context"
"errors"
"fmt"
"github.com/go-fed/activity/streams/vocab"
"github.com/owncast/owncast/activitypub/apmodels"
log "github.com/sirupsen/logrus"
)
func getPersonFromFollow(activity vocab.ActivityStreamsFollow) (apmodels.ActivityPubActor, error) {
return GetResolvedActorFromActorProperty(activity.GetActivityStreamsActor())
}
// MakeFollowRequest will convert an inbound Follow request to our internal actor model.
func MakeFollowRequest(c context.Context, activity vocab.ActivityStreamsFollow) (*apmodels.ActivityPubActor, error) {
person, err := getPersonFromFollow(activity)
if err != nil {
return nil, errors.New("unable to resolve person from follow request: " + err.Error())
}
hostname := person.ActorIri.Hostname()
username := person.Username
fullUsername := fmt.Sprintf("%s@%s", username, hostname)
followRequest := apmodels.ActivityPubActor{
ActorIri: person.ActorIri,
FollowRequestIri: activity.GetJSONLDId().Get(),
Inbox: person.Inbox,
Name: person.Name,
Username: fullUsername,
Image: person.Image,
}
return &followRequest, nil
}
// MakeUnFollowRequest will convert an inbound Unfollow request to our internal actor model.
func MakeUnFollowRequest(c context.Context, activity vocab.ActivityStreamsUndo) *apmodels.ActivityPubActor {
person, err := GetResolvedActorFromActorProperty(activity.GetActivityStreamsActor())
if err != nil {
log.Errorln("unable to resolve person from actor iri", person.ActorIri, err)
return nil
}
unfollowRequest := apmodels.ActivityPubActor{
ActorIri: person.ActorIri,
FollowRequestIri: activity.GetJSONLDId().Get(),
Inbox: person.Inbox,
Name: person.Name,
Image: person.Image,
}
return &unfollowRequest
}

View File

@@ -0,0 +1,125 @@
package resolvers
import (
"context"
"encoding/json"
"io/ioutil"
"net/http"
"github.com/go-fed/activity/streams"
"github.com/go-fed/activity/streams/vocab"
"github.com/owncast/owncast/activitypub/apmodels"
"github.com/owncast/owncast/activitypub/crypto"
"github.com/owncast/owncast/core/data"
"github.com/pkg/errors"
log "github.com/sirupsen/logrus"
)
// Resolve will translate a raw ActivityPub payload and fire the callback associated with that activity type.
func Resolve(c context.Context, data []byte, callbacks ...interface{}) error {
jsonResolver, err := streams.NewJSONResolver(callbacks...)
if err != nil {
// Something in the setup was wrong. For example, a callback has an
// unsupported signature and would never be called
return err
}
var jsonMap map[string]interface{}
if err = json.Unmarshal(data, &jsonMap); err != nil {
return err
}
log.Debugln("Resolving payload...", string(data))
// The createCallback function will be called.
err = jsonResolver.Resolve(c, jsonMap)
if err != nil && !streams.IsUnmatchedErr(err) {
// Something went wrong
return err
} else if streams.IsUnmatchedErr(err) {
// Everything went right but the callback didn't match or the ActivityStreams
// type is one that wasn't code generated.
log.Debugln("No match: ", err)
}
return nil
}
// ResolveIRI will resolve an IRI ahd call the correct callback for the resolved type.
func ResolveIRI(c context.Context, iri string, callbacks ...interface{}) error {
log.Debugln("Resolving", iri)
req, _ := http.NewRequest("GET", iri, nil)
actor := apmodels.MakeLocalIRIForAccount(data.GetDefaultFederationUsername())
if err := crypto.SignRequest(req, nil, actor); err != nil {
return err
}
response, err := http.DefaultClient.Do(req)
if err != nil {
return err
}
defer response.Body.Close()
data, err := ioutil.ReadAll(response.Body)
if err != nil {
return err
}
// fmt.Println(string(data))
return Resolve(c, data, callbacks...)
}
// GetResolvedActorFromActorProperty resolve an actor property to a fully populated person.
func GetResolvedActorFromActorProperty(actor vocab.ActivityStreamsActorProperty) (apmodels.ActivityPubActor, error) {
var err error
var apActor apmodels.ActivityPubActor
personCallback := func(c context.Context, person vocab.ActivityStreamsPerson) error {
apActor = apmodels.MakeActorFromPerson(person)
return nil
}
serviceCallback := func(c context.Context, s vocab.ActivityStreamsService) error {
apActor = apmodels.MakeActorFromService(s)
return nil
}
for iter := actor.Begin(); iter != actor.End(); iter = iter.Next() {
if iter.IsIRI() {
iri := iter.GetIRI()
if e := ResolveIRI(context.Background(), iri.String(), personCallback, serviceCallback); e != nil {
err = e
}
} else if iter.IsActivityStreamsPerson() {
person := iter.GetActivityStreamsPerson()
apActor = apmodels.MakeActorFromPerson(person)
}
}
return apActor, errors.Wrap(err, "unable to resolve actor from actor property")
}
// GetResolvedActorFromIRI will resolve an IRI string to a fully populated actor.
func GetResolvedActorFromIRI(personOrServiceIRI string) (apmodels.ActivityPubActor, error) {
var err error
var apActor apmodels.ActivityPubActor
personCallback := func(c context.Context, person vocab.ActivityStreamsPerson) error {
apActor = apmodels.MakeActorFromPerson(person)
return nil
}
serviceCallback := func(c context.Context, s vocab.ActivityStreamsService) error {
apActor = apmodels.MakeActorFromService(s)
return nil
}
if e := ResolveIRI(context.Background(), personOrServiceIRI, personCallback, serviceCallback); e != nil {
err = e
}
return apActor, errors.Wrap(err, "unable to resolve actor from IRI string: "+personOrServiceIRI)
}

35
activitypub/router.go Normal file
View File

@@ -0,0 +1,35 @@
package activitypub
import (
"net/http"
"github.com/owncast/owncast/activitypub/controllers"
"github.com/owncast/owncast/router/middleware"
)
// StartRouter will start the federation specific http router.
func StartRouter() {
// WebFinger
http.HandleFunc("/.well-known/webfinger", controllers.WebfingerHandler)
// Host Metadata
http.HandleFunc("/.well-known/host-meta", controllers.HostMetaController)
// Nodeinfo v1
http.HandleFunc("/.well-known/nodeinfo", controllers.NodeInfoController)
// x-nodeinfo v2
http.HandleFunc("/.well-known/x-nodeinfo2", controllers.XNodeInfo2Controller)
// Nodeinfo v2
http.HandleFunc("/nodeinfo/2.0", controllers.NodeInfoV2Controller)
// Instance details
http.HandleFunc("/api/v1/instance", controllers.InstanceV1Controller)
// Single ActivityPub Actor
http.HandleFunc("/federation/user/", middleware.RequireActivityPubOrRedirect(controllers.ActorHandler))
// Single AP object
http.HandleFunc("/federation/", middleware.RequireActivityPubOrRedirect(controllers.ObjectHandler))
}

View File

@@ -0,0 +1,66 @@
package workerpool
import (
"net/http"
log "github.com/sirupsen/logrus"
)
const (
// ActivityPubWorkerPoolSize defines the number of concurrent HTTP ActivityPub requests.
ActivityPubWorkerPoolSize = 10
)
// Job struct bundling the ActivityPub and the payload in one struct.
type Job struct {
request *http.Request
}
var queue chan Job
// InitOutboundWorkerPool starts n go routines that await ActivityPub jobs.
func InitOutboundWorkerPool() {
queue = make(chan Job)
// start workers
for i := 1; i <= ActivityPubWorkerPoolSize; i++ {
go worker(i, queue)
}
}
// AddToOutboundQueue will queue up an outbound http request.
func AddToOutboundQueue(req *http.Request) {
log.Tracef("Queued request for ActivityPub destination %s", req.RequestURI)
queue <- Job{req}
}
func worker(workerID int, queue <-chan Job) {
log.Debugf("Started ActivityPub worker %d", workerID)
for job := range queue {
if err := sendActivityPubMessageToInbox(job); err != nil {
log.Errorf("ActivityPub destination %s failed to send Error: %s", job.request.RequestURI, err)
}
log.Tracef("Done with ActivityPub destination %s using worker %d", job.request.RequestURI, workerID)
}
}
func sendActivityPubMessageToInbox(job Job) error {
// req, err := http.NewRequest("POST", job.inbox.String(), bytes.NewReader(job.payload))
// if err != nil {
// return err
// }
// req.Header.Set("Content-Type", "application/json")
client := &http.Client{}
resp, err := client.Do(job.request)
if err != nil {
return err
}
defer resp.Body.Close()
return nil
}