Connected clients admin API (#217)
* Add support for ending the inbound stream. Closes #191 * Add a simple success response to API requests * Connected clients API with geo details * Post-rebase cleanup * Make setting and reading geo details separate operations to unblock and speed up * Rename file * Fire geoip api call behind goroutine * Add comment * Post-rebase fixes * Add support for the MaxMind GeoLite2 GeoIP database
This commit is contained in:
@@ -70,3 +70,12 @@ func GetMessages() []models.ChatMessage {
|
||||
|
||||
return getChatHistory()
|
||||
}
|
||||
|
||||
func GetClient(clientID string) *Client {
|
||||
for _, client := range _server.Clients {
|
||||
if client.ClientID == clientID {
|
||||
return client
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -9,6 +9,7 @@ import (
|
||||
log "github.com/sirupsen/logrus"
|
||||
"golang.org/x/net/websocket"
|
||||
|
||||
"github.com/owncast/owncast/geoip"
|
||||
"github.com/owncast/owncast/models"
|
||||
"github.com/owncast/owncast/utils"
|
||||
|
||||
@@ -21,8 +22,12 @@ const channelBufSize = 100
|
||||
type Client struct {
|
||||
ConnectedAt time.Time
|
||||
MessageCount int
|
||||
UserAgent string
|
||||
IPAddress string
|
||||
Username *string
|
||||
ClientID string // How we identify unique viewers when counting viewer counts.
|
||||
Geo *geoip.GeoDetails `json:"geo"`
|
||||
|
||||
clientID string // How we identify unique viewers when counting viewer counts.
|
||||
socketID string // How we identify a single websocket client.
|
||||
ws *websocket.Conn
|
||||
ch chan models.ChatMessage
|
||||
@@ -50,10 +55,12 @@ func NewClient(ws *websocket.Conn) *Client {
|
||||
pingch := make(chan models.PingMessage)
|
||||
usernameChangeChannel := make(chan models.NameChangeEvent)
|
||||
|
||||
ipAddress := utils.GetIPAddressFromRequest(ws.Request())
|
||||
userAgent := ws.Request().UserAgent()
|
||||
clientID := utils.GenerateClientIDFromRequest(ws.Request())
|
||||
socketID, _ := shortid.Generate()
|
||||
|
||||
return &Client{time.Now(), 0, clientID, socketID, ws, ch, pingch, usernameChangeChannel, doneCh}
|
||||
return &Client{time.Now(), 0, userAgent, ipAddress, nil, clientID, nil, socketID, ws, ch, pingch, usernameChangeChannel, doneCh}
|
||||
}
|
||||
|
||||
//GetConnection gets the connection for the client
|
||||
@@ -66,7 +73,7 @@ func (c *Client) Write(msg models.ChatMessage) {
|
||||
case c.ch <- msg:
|
||||
default:
|
||||
_server.remove(c)
|
||||
_server.err(fmt.Errorf("client %s is disconnected", c.clientID))
|
||||
_server.err(fmt.Errorf("client %s is disconnected", c.ClientID))
|
||||
}
|
||||
}
|
||||
|
||||
@@ -153,6 +160,7 @@ func (c *Client) userChangedName(data []byte) {
|
||||
msg.Type = NAMECHANGE
|
||||
msg.ID = shortid.MustGenerate()
|
||||
_server.usernameChanged(msg)
|
||||
c.Username = &msg.NewName
|
||||
}
|
||||
|
||||
func (c *Client) chatMessageReceived(data []byte) {
|
||||
@@ -168,7 +176,21 @@ func (c *Client) chatMessageReceived(data []byte) {
|
||||
msg.Visible = true
|
||||
|
||||
c.MessageCount++
|
||||
c.Username = &msg.Author
|
||||
|
||||
msg.ClientID = c.clientID
|
||||
msg.ClientID = c.ClientID
|
||||
_server.SendToAll(msg)
|
||||
}
|
||||
|
||||
// GetViewerClientFromChatClient returns a general models.Client from a chat websocket client.
|
||||
func (c *Client) GetViewerClientFromChatClient() models.Client {
|
||||
return models.Client{
|
||||
ConnectedAt: c.ConnectedAt,
|
||||
MessageCount: c.MessageCount,
|
||||
UserAgent: c.UserAgent,
|
||||
IPAddress: c.IPAddress,
|
||||
Username: c.Username,
|
||||
ClientID: c.ClientID,
|
||||
Geo: geoip.GetGeoFromIP(c.IPAddress),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -79,7 +79,7 @@ func (s *server) onConnection(ws *websocket.Conn) {
|
||||
client := NewClient(ws)
|
||||
|
||||
defer func() {
|
||||
log.Tracef("The client was connected for %s and sent %d messages (%s)", time.Since(client.ConnectedAt), client.MessageCount, client.clientID)
|
||||
log.Tracef("The client was connected for %s and sent %d messages (%s)", time.Since(client.ConnectedAt), client.MessageCount, client.ClientID)
|
||||
|
||||
if err := ws.Close(); err != nil {
|
||||
s.errCh <- err
|
||||
@@ -102,13 +102,13 @@ func (s *server) Listen() {
|
||||
// add new a client
|
||||
case c := <-s.addCh:
|
||||
s.Clients[c.socketID] = c
|
||||
s.listener.ClientAdded(c.clientID)
|
||||
s.listener.ClientAdded(c.GetViewerClientFromChatClient())
|
||||
s.sendWelcomeMessageToClient(c)
|
||||
|
||||
// remove a client
|
||||
case c := <-s.delCh:
|
||||
delete(s.Clients, c.socketID)
|
||||
s.listener.ClientRemoved(c.clientID)
|
||||
s.listener.ClientRemoved(c.ClientID)
|
||||
|
||||
// broadcast a message to all clients
|
||||
case msg := <-s.sendAllCh:
|
||||
@@ -138,3 +138,13 @@ func (s *server) sendWelcomeMessageToClient(c *Client) {
|
||||
}()
|
||||
|
||||
}
|
||||
|
||||
func (s *server) getClientForClientID(clientID string) *Client {
|
||||
for _, client := range s.Clients {
|
||||
if client.ClientID == clientID {
|
||||
return client
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -11,8 +11,8 @@ import (
|
||||
type ChatListenerImpl struct{}
|
||||
|
||||
//ClientAdded is for when a client is added the system
|
||||
func (cl ChatListenerImpl) ClientAdded(clientID string) {
|
||||
SetClientActive(clientID)
|
||||
func (cl ChatListenerImpl) ClientAdded(client models.Client) {
|
||||
SetClientActive(client)
|
||||
}
|
||||
|
||||
//ClientRemoved is for when a client disconnects/is removed
|
||||
|
||||
@@ -11,6 +11,8 @@ import (
|
||||
log "github.com/sirupsen/logrus"
|
||||
|
||||
"github.com/owncast/owncast/config"
|
||||
"github.com/owncast/owncast/core/chat"
|
||||
"github.com/owncast/owncast/geoip"
|
||||
"github.com/owncast/owncast/models"
|
||||
"github.com/owncast/owncast/utils"
|
||||
)
|
||||
@@ -55,9 +57,13 @@ func setupStats() error {
|
||||
}
|
||||
|
||||
func purgeStaleViewers() {
|
||||
for clientID, lastConnectedtime := range _stats.Clients {
|
||||
timeSinceLastActive := time.Since(lastConnectedtime).Minutes()
|
||||
if timeSinceLastActive > 2 {
|
||||
for clientID, client := range _stats.Clients {
|
||||
if client.LastSeen.IsZero() {
|
||||
continue
|
||||
}
|
||||
|
||||
timeSinceLastActive := time.Since(client.LastSeen).Minutes()
|
||||
if timeSinceLastActive > 1 {
|
||||
RemoveClient(clientID)
|
||||
}
|
||||
}
|
||||
@@ -80,13 +86,22 @@ func IsStreamConnected() bool {
|
||||
}
|
||||
|
||||
//SetClientActive sets a client as active and connected
|
||||
func SetClientActive(clientID string) {
|
||||
// if _, ok := s.clients[clientID]; !ok {
|
||||
// fmt.Println("Marking client active:", clientID, s.GetViewerCount()+1, "clients connected.")
|
||||
// }
|
||||
|
||||
func SetClientActive(client models.Client) {
|
||||
l.Lock()
|
||||
_stats.Clients[clientID] = time.Now()
|
||||
// If this clientID already exists then update it.
|
||||
// Otherwise set a new one.
|
||||
if existingClient, ok := _stats.Clients[client.ClientID]; ok {
|
||||
existingClient.LastSeen = time.Now()
|
||||
existingClient.Username = client.Username
|
||||
existingClient.MessageCount = client.MessageCount
|
||||
existingClient.Geo = geoip.GetGeoFromIP(existingClient.IPAddress)
|
||||
_stats.Clients[client.ClientID] = existingClient
|
||||
} else {
|
||||
if client.Geo == nil {
|
||||
geoip.FetchGeoForIP(client.IPAddress)
|
||||
}
|
||||
_stats.Clients[client.ClientID] = client
|
||||
}
|
||||
l.Unlock()
|
||||
|
||||
// Don't update viewer counts if a live stream session is not active.
|
||||
@@ -103,6 +118,19 @@ func RemoveClient(clientID string) {
|
||||
delete(_stats.Clients, clientID)
|
||||
}
|
||||
|
||||
func GetClients() []models.Client {
|
||||
clients := make([]models.Client, 0)
|
||||
for _, client := range _stats.Clients {
|
||||
chatClient := chat.GetClient(client.ClientID)
|
||||
if chatClient != nil {
|
||||
clients = append(clients, chatClient.GetViewerClientFromChatClient())
|
||||
} else {
|
||||
clients = append(clients, client)
|
||||
}
|
||||
}
|
||||
return clients
|
||||
}
|
||||
|
||||
func saveStatsToFile() error {
|
||||
jsonData, err := json.Marshal(_stats)
|
||||
if err != nil {
|
||||
@@ -125,7 +153,7 @@ func saveStatsToFile() error {
|
||||
|
||||
func getSavedStats() (models.Stats, error) {
|
||||
result := models.Stats{
|
||||
Clients: make(map[string]time.Time),
|
||||
Clients: make(map[string]models.Client),
|
||||
}
|
||||
|
||||
if !utils.DoesFileExists(statsFilePath) {
|
||||
|
||||
Reference in New Issue
Block a user