oktopus/backend/services/stomp/server/client/subscription_list.go
2023-10-28 16:00:27 -03:00

108 lines
3.0 KiB
Go

package client
import (
"container/list"
)
// Maintains a list of subscriptions. Not thread-safe.
type SubscriptionList struct {
// TODO: implement linked list locally, adding next and prev
// pointers to the Subscription struct itself.
subs *list.List
}
func NewSubscriptionList() *SubscriptionList {
return &SubscriptionList{list.New()}
}
// Add a subscription to the back of the list. Will panic if
// the subscription destination does not match the subscription
// list destination. Will also panic if the subscription has already
// been added to a subscription list.
func (sl *SubscriptionList) Add(sub *Subscription) {
if sub.subList != nil {
panic("subscription is already in a subscription list")
}
sl.subs.PushBack(sub)
sub.subList = sl
}
// Gets the first subscription in the list, or nil if there
// are no subscriptions available. The subscription is removed
// from the list.
func (sl *SubscriptionList) Get() *Subscription {
if sl.subs.Len() == 0 {
return nil
}
front := sl.subs.Front()
sub := front.Value.(*Subscription)
sl.subs.Remove(front)
sub.subList = nil
return sub
}
// Removes the subscription from the list.
func (sl *SubscriptionList) Remove(s *Subscription) {
for e := sl.subs.Front(); e != nil; e = e.Next() {
if e.Value.(*Subscription) == s {
sl.subs.Remove(e)
s.subList = nil
return
}
}
}
// Search for a subscription with the specified id and remove it.
// Returns a pointer to the subscription if found, nil otherwise.
func (sl *SubscriptionList) FindByIdAndRemove(id string) *Subscription {
for e := sl.subs.Front(); e != nil; e = e.Next() {
sub := e.Value.(*Subscription)
if sub.id == id {
sl.subs.Remove(e)
return sub
}
}
return nil
}
// Finds all subscriptions in the subscription list that are acked by the
// specified message-id (or ack) header. The subscription is removed from
// the list and the callback function called for that subscription.
func (sl *SubscriptionList) Ack(msgId uint64, callback func(s *Subscription)) {
for e := sl.subs.Front(); e != nil; {
next := e.Next()
sub := e.Value.(*Subscription)
if sub.IsAckedBy(msgId) {
sl.subs.Remove(e)
callback(sub)
}
e = next
}
}
// Finds all subscriptions in the subscription list that are *nacked* by the
// specified message-id (or ack) header. The subscription is removed from
// the list and the callback function called for that subscription. Current
// understanding that all NACKs are individual, but not sure
func (sl *SubscriptionList) Nack(msgId uint64, callback func(s *Subscription)) {
for e := sl.subs.Front(); e != nil; {
next := e.Next()
sub := e.Value.(*Subscription)
if sub.IsNackedBy(msgId) {
sl.subs.Remove(e)
callback(sub)
}
e = next
}
}
// Invoke a callback function for every subscription in the list.
func (sl *SubscriptionList) ForEach(callback func(s *Subscription, isLast bool)) {
for e := sl.subs.Front(); e != nil; {
next := e.Next()
sub := e.Value.(*Subscription)
callback(sub, next == nil)
e = next
}
}