oktopus/backend/services/mtp/stomp/server/queue/memory_queue.go

71 lines
1.4 KiB
Go

package queue
import (
"container/list"
"github.com/go-stomp/stomp/v3/frame"
)
// In-memory implementation of the QueueStorage interface.
type MemoryQueueStorage struct {
lists map[string]*list.List
}
func NewMemoryQueueStorage() Storage {
m := &MemoryQueueStorage{lists: make(map[string]*list.List)}
return m
}
func (m *MemoryQueueStorage) Enqueue(queue string, frame *frame.Frame) error {
l, ok := m.lists[queue]
if !ok {
l = list.New()
m.lists[queue] = l
}
l.PushBack(frame)
return nil
}
// Pushes a frame to the head of the queue. Sets
// the "message-id" header of the frame if it is not
// already set.
func (m *MemoryQueueStorage) Requeue(queue string, frame *frame.Frame) error {
l, ok := m.lists[queue]
if !ok {
l = list.New()
m.lists[queue] = l
}
l.PushFront(frame)
return nil
}
// Removes a frame from the head of the queue.
// Returns nil if no frame is available.
func (m *MemoryQueueStorage) Dequeue(queue string) (*frame.Frame, error) {
l, ok := m.lists[queue]
if !ok {
return nil, nil
}
element := l.Front()
if element == nil {
return nil, nil
}
return l.Remove(element).(*frame.Frame), nil
}
// Called at server startup. Allows the queue storage
// to perform any initialization.
func (m *MemoryQueueStorage) Start() {
m.lists = make(map[string]*list.List)
}
// Called prior to server shutdown. Allows the queue storage
// to perform any cleanup.
func (m *MemoryQueueStorage) Stop() {
m.lists = nil
}