782 lines
22 KiB
Go
782 lines
22 KiB
Go
package client
|
|
|
|
import (
|
|
"fmt"
|
|
"io"
|
|
"net"
|
|
"strconv"
|
|
"time"
|
|
|
|
"github.com/go-stomp/stomp/v3"
|
|
"github.com/go-stomp/stomp/v3/frame"
|
|
)
|
|
|
|
// Maximum number of pending frames allowed to a client.
|
|
// before a disconnect occurs. If the client cannot keep
|
|
// up with the server, we do not want the server to backlog
|
|
// pending frames indefinitely.
|
|
const maxPendingWrites = 16
|
|
|
|
// Maximum number of pending frames allowed before the read
|
|
// go routine starts blocking.
|
|
const maxPendingReads = 16
|
|
|
|
// Represents a connection with the STOMP client.
|
|
type Conn struct {
|
|
config Config
|
|
rw net.Conn // Network connection to client
|
|
writer *frame.Writer // Writes STOMP frames directly to the network connection
|
|
requestChannel chan Request // For sending requests to upper layer
|
|
subChannel chan *Subscription // Receives subscription messages for client
|
|
writeChannel chan *frame.Frame // Receives unacknowledged (topic) messages for client
|
|
readChannel chan *frame.Frame // Receives frames from the client
|
|
stateFunc func(c *Conn, f *frame.Frame) error // State processing function
|
|
writeTimeout time.Duration // Heart beat write timeout
|
|
version stomp.Version // Negotiated STOMP protocol version
|
|
closed bool // Is the connection closed
|
|
txStore *txStore // Stores transactions in progress
|
|
lastMsgId uint64 // last message-id value
|
|
subList *SubscriptionList // List of subscriptions requiring acknowledgement
|
|
subs map[string]*Subscription // All subscriptions, keyed by id
|
|
validator stomp.Validator // For validating STOMP frames
|
|
log stomp.Logger
|
|
}
|
|
|
|
// Creates a new client connection. The config parameter contains
|
|
// process-wide configuration parameters relevant to a client connection.
|
|
// The rw parameter is a network connection object for communicating with
|
|
// the client. All client requests are sent via the ch channel to the
|
|
// upper layer.
|
|
func NewConn(config Config, rw net.Conn, ch chan Request) *Conn {
|
|
c := &Conn{
|
|
config: config,
|
|
rw: rw,
|
|
requestChannel: ch,
|
|
subChannel: make(chan *Subscription, maxPendingWrites),
|
|
writeChannel: make(chan *frame.Frame, maxPendingWrites),
|
|
readChannel: make(chan *frame.Frame, maxPendingReads),
|
|
txStore: &txStore{},
|
|
subList: NewSubscriptionList(),
|
|
subs: make(map[string]*Subscription),
|
|
log: config.Logger(),
|
|
}
|
|
go c.readLoop()
|
|
go c.processLoop()
|
|
return c
|
|
}
|
|
|
|
// Write a frame to the connection without requiring
|
|
// any acknowledgement.
|
|
func (c *Conn) Send(f *frame.Frame) {
|
|
// Place the frame on the write channel. If the
|
|
// write channel is full, the caller will block.
|
|
c.writeChannel <- f
|
|
}
|
|
|
|
// Send and ERROR message to the client. The client
|
|
// connection will disconnect as soon as the ERROR
|
|
// message has been transmitted. The message header
|
|
// will be based on the contents of the err parameter.
|
|
func (c *Conn) SendError(err error) {
|
|
f := frame.New(frame.ERROR, frame.Message, err.Error())
|
|
c.Send(f) // will close after successful send
|
|
}
|
|
|
|
// Send an ERROR frame to the client and immediately. The error
|
|
// message is derived from err. If f is non-nil, it is the frame
|
|
// whose contents have caused the error. Include the receipt-id
|
|
// header if the frame contains a receipt header.
|
|
func (c *Conn) sendErrorImmediately(err error, f *frame.Frame) {
|
|
errorFrame := frame.New(frame.ERROR,
|
|
frame.Message, err.Error())
|
|
|
|
// Include a receipt-id header if the frame that prompted the error had
|
|
// a receipt header (as suggested by the STOMP protocol spec).
|
|
if f != nil {
|
|
if receipt, ok := f.Header.Contains(frame.Receipt); ok {
|
|
errorFrame.Header.Add(frame.ReceiptId, receipt)
|
|
}
|
|
}
|
|
|
|
// send the frame to the client, ignore any error condition
|
|
// because we are about to close the connection anyway
|
|
_ = c.sendImmediately(errorFrame)
|
|
}
|
|
|
|
// Sends a STOMP frame to the client immediately, does not push onto the
|
|
// write channel to be processed in turn.
|
|
func (c *Conn) sendImmediately(f *frame.Frame) error {
|
|
return c.writer.Write(f)
|
|
}
|
|
|
|
// Go routine for reading bytes from a client and assembling into
|
|
// STOMP frames. Also handles heart-beat read timeout. All read
|
|
// frames are pushed onto the read channel to be processed by the
|
|
// processLoop go-routine. This keeps all processing of frames for
|
|
// this connection on the one go-routine and avoids race conditions.
|
|
func (c *Conn) readLoop() {
|
|
reader := frame.NewReader(c.rw)
|
|
expectingConnect := true
|
|
readTimeout := time.Duration(0)
|
|
for {
|
|
if readTimeout == time.Duration(0) {
|
|
// infinite timeout
|
|
c.rw.SetReadDeadline(time.Time{})
|
|
} else {
|
|
c.rw.SetReadDeadline(time.Now().Add(readTimeout * 2))
|
|
}
|
|
f, err := reader.Read()
|
|
if err != nil {
|
|
if err == io.EOF {
|
|
c.log.Errorf("connection closed: %s", c.rw.RemoteAddr())
|
|
} else {
|
|
c.log.Errorf("read failed: %v : %s", err, c.rw.RemoteAddr())
|
|
}
|
|
|
|
// Close the read channel so that the processing loop will
|
|
// know to terminate, if it has not already done so. This is
|
|
// the only channel that we close, because it is the only one
|
|
// we know who is writing to.
|
|
close(c.readChannel)
|
|
return
|
|
}
|
|
|
|
if f == nil {
|
|
// if the frame is nil, then it is a heartbeat
|
|
continue
|
|
}
|
|
|
|
// If we are expecting a CONNECT or STOMP command, extract
|
|
// the heart-beat header and work out the read timeout.
|
|
// Note that the processing loop will duplicate this to
|
|
// some extent, but letting this go-routine work out its own
|
|
// read timeout means no synchronization is necessary.
|
|
if expectingConnect {
|
|
// Expecting a CONNECT or STOMP command, get the heart-beat
|
|
cx, _, err := getHeartBeat(f)
|
|
|
|
// Ignore the error condition and treat as no read timeout.
|
|
// The processing loop will handle the error again and
|
|
// process correctly.
|
|
if err == nil {
|
|
// Minimum value as per server config. If the client
|
|
// has requested shorter periods than this value, the
|
|
// server will insist on the longer time period.
|
|
min := asMilliseconds(c.config.HeartBeat(), maxHeartBeat)
|
|
|
|
// apply a minimum heartbeat
|
|
if cx > 0 && cx < min {
|
|
cx = min
|
|
}
|
|
|
|
readTimeout = time.Duration(cx) * time.Millisecond
|
|
|
|
expectingConnect = false
|
|
}
|
|
}
|
|
|
|
// Add the frame to the read channel. Note that this will block
|
|
// if we are reading from the client quicker than the server
|
|
// can process frames.
|
|
c.readChannel <- f
|
|
}
|
|
}
|
|
|
|
// Go routine that processes all read frames and all write frames.
|
|
// Having all processing in one go routine helps eliminate any race conditions.
|
|
func (c *Conn) processLoop() {
|
|
defer c.cleanupConn()
|
|
|
|
c.writer = frame.NewWriter(c.rw)
|
|
c.stateFunc = connecting
|
|
|
|
var timerChannel <-chan time.Time
|
|
var timer *time.Timer
|
|
for {
|
|
if c.writeTimeout > 0 && timer == nil {
|
|
timer = time.NewTimer(c.writeTimeout)
|
|
timerChannel = timer.C
|
|
}
|
|
|
|
select {
|
|
case f, ok := <-c.writeChannel:
|
|
if !ok {
|
|
// write channel has been closed, so
|
|
// exit go-routine (after cleaning up)
|
|
return
|
|
}
|
|
|
|
// have a frame to the client with
|
|
// no acknowledgement required (topic)
|
|
|
|
// stop the heart-beat timer
|
|
if timer != nil {
|
|
timer.Stop()
|
|
timer = nil
|
|
}
|
|
|
|
c.allocateMessageId(f, nil)
|
|
|
|
// write the frame to the client
|
|
err := c.writer.Write(f)
|
|
if err != nil {
|
|
// if there is an error writing to
|
|
// the client, there is not much
|
|
// point trying to send an ERROR frame,
|
|
// so just exit go-routine (after cleaning up)
|
|
return
|
|
}
|
|
|
|
// if the frame just sent to the client is an error
|
|
// frame, we disconnect
|
|
if f.Command == frame.ERROR {
|
|
// sent an ERROR frame, so disconnect
|
|
return
|
|
}
|
|
|
|
case f, ok := <-c.readChannel:
|
|
if !ok {
|
|
// read channel has been closed, so
|
|
// exit go-routine (after cleaning up)
|
|
return
|
|
}
|
|
|
|
// Just received a frame from the client.
|
|
// Validate the frame, checking for mandatory
|
|
// headers and prohibited headers.
|
|
if c.validator != nil {
|
|
err := c.validator.Validate(f)
|
|
if err != nil {
|
|
c.log.Warningf("validation failed for %s frame: %v", f.Command, err)
|
|
c.sendErrorImmediately(err, f)
|
|
return
|
|
}
|
|
}
|
|
|
|
// Pass to the appropriate function for handling
|
|
// according to the current state of the connection.
|
|
err := c.stateFunc(c, f)
|
|
if err != nil {
|
|
c.sendErrorImmediately(err, f)
|
|
return
|
|
}
|
|
|
|
case sub, ok := <-c.subChannel:
|
|
if !ok {
|
|
// subscription channel has been closed,
|
|
// so exit go-routine (after cleaning up)
|
|
return
|
|
}
|
|
|
|
// have a frame to the client which requires
|
|
// acknowledgement to the upper layer
|
|
|
|
// stop the heart-beat timer
|
|
if timer != nil {
|
|
timer.Stop()
|
|
timer = nil
|
|
}
|
|
|
|
// there is the possibility that the subscription
|
|
// has been unsubscribed just prior to receiving
|
|
// this, so we check
|
|
if _, ok = c.subs[sub.id]; ok {
|
|
// allocate a message-id, note that the
|
|
// subscription id has already been set
|
|
c.allocateMessageId(sub.frame, sub)
|
|
|
|
// write the frame to the client
|
|
err := c.writer.Write(sub.frame)
|
|
if err != nil {
|
|
// if there is an error writing to
|
|
// the client, there is not much
|
|
// point trying to send an ERROR frame,
|
|
// so just exit go-routine (after cleaning up)
|
|
return
|
|
}
|
|
|
|
if sub.ack == frame.AckAuto {
|
|
// subscription does not require acknowledgement,
|
|
// so send the subscription back the upper layer
|
|
// straight away
|
|
sub.frame = nil
|
|
c.requestChannel <- Request{Op: SubscribeOp, Sub: sub}
|
|
} else {
|
|
// subscription requires acknowledgement
|
|
c.subList.Add(sub)
|
|
}
|
|
} else {
|
|
// Subscription no longer exists, requeue
|
|
c.requestChannel <- Request{Op: RequeueOp, Frame: sub.frame}
|
|
}
|
|
|
|
case _ = <-timerChannel:
|
|
// stop the heart-beat timer
|
|
if timer != nil {
|
|
timer.Stop()
|
|
timer = nil
|
|
}
|
|
// write a heart-beat
|
|
err := c.writer.Write(nil)
|
|
if err != nil {
|
|
return
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// Called when the connection is closing, and takes care of
|
|
// unsubscribing all subscriptions with the upper layer, and
|
|
// re-queueing all unacknowledged messages to the upper layer.
|
|
func (c *Conn) cleanupConn() {
|
|
// clean up any pending transactions
|
|
c.txStore.Init()
|
|
|
|
c.discardWriteChannelFrames()
|
|
|
|
// Unsubscribe every subscription known to the upper layer.
|
|
// This should be done before cleaning up the subscription
|
|
// channel. If we requeued messages before doing this,
|
|
// we might end up getting them back again.
|
|
for _, sub := range c.subs {
|
|
// Note that we only really need to send a request if the
|
|
// subscription does not have a frame, but for simplicity
|
|
// all subscriptions are unsubscribed from the upper layer.
|
|
c.requestChannel <- Request{Op: UnsubscribeOp, Sub: sub}
|
|
}
|
|
|
|
// Clear out the map of subscriptions
|
|
c.subs = nil
|
|
|
|
// Every subscription requiring acknowledgement has a frame
|
|
// that needs to be requeued in the upper layer
|
|
for sub := c.subList.Get(); sub != nil; sub = c.subList.Get() {
|
|
c.requestChannel <- Request{Op: RequeueOp, Frame: sub.frame}
|
|
}
|
|
|
|
// empty the subscription and write queue
|
|
c.discardWriteChannelFrames()
|
|
c.cleanupSubChannel()
|
|
|
|
// Tell the upper layer we are now disconnected
|
|
c.requestChannel <- Request{Op: DisconnectedOp, Conn: c}
|
|
|
|
// empty the subscription and write queue one more time
|
|
c.discardWriteChannelFrames()
|
|
c.cleanupSubChannel()
|
|
|
|
// Should not hurt to call this if it is already closed?
|
|
c.rw.Close()
|
|
}
|
|
|
|
// Discard anything on the write channel. These frames
|
|
// do not get acknowledged, and are either topic MESSAGE
|
|
// frames or ERROR frames.
|
|
func (c *Conn) discardWriteChannelFrames() {
|
|
for finished := false; !finished; {
|
|
select {
|
|
case _, ok := <-c.writeChannel:
|
|
if !ok {
|
|
finished = true
|
|
}
|
|
|
|
default:
|
|
finished = true
|
|
}
|
|
}
|
|
}
|
|
|
|
func (c *Conn) cleanupSubChannel() {
|
|
// Read the subscription channel until it is empty.
|
|
// Each frame should be requeued to the upper layer.
|
|
for finished := false; !finished; {
|
|
select {
|
|
case sub, ok := <-c.subChannel:
|
|
if !ok {
|
|
finished = true
|
|
} else {
|
|
c.requestChannel <- Request{Op: RequeueOp, Frame: sub.frame}
|
|
}
|
|
|
|
default:
|
|
finished = true
|
|
}
|
|
}
|
|
}
|
|
|
|
// Send a frame to the client, allocating necessary headers prior.
|
|
func (c *Conn) allocateMessageId(f *frame.Frame, sub *Subscription) {
|
|
if f.Command == frame.MESSAGE || f.Command == frame.ACK {
|
|
// allocate the value of message-id for this frame
|
|
c.lastMsgId++
|
|
messageId := strconv.FormatUint(c.lastMsgId, 10)
|
|
f.Header.Set(frame.MessageId, messageId)
|
|
f.Header.Set(frame.Id, messageId)
|
|
|
|
// if there is any requirement by the client to acknowledge, set
|
|
// the ack header as per STOMP 1.2
|
|
if sub == nil || sub.ack == frame.AckAuto {
|
|
f.Header.Del(frame.Ack)
|
|
} else {
|
|
f.Header.Set(frame.Ack, messageId)
|
|
}
|
|
}
|
|
}
|
|
|
|
// State function for expecting connect frame.
|
|
func connecting(c *Conn, f *frame.Frame) error {
|
|
switch f.Command {
|
|
case frame.CONNECT, frame.STOMP:
|
|
return c.handleConnect(f)
|
|
}
|
|
return notConnected
|
|
}
|
|
|
|
// State function for after connect frame received.
|
|
func connected(c *Conn, f *frame.Frame) error {
|
|
switch f.Command {
|
|
case frame.CONNECT, frame.STOMP:
|
|
return unexpectedCommand
|
|
case frame.DISCONNECT:
|
|
return c.handleDisconnect(f)
|
|
case frame.BEGIN:
|
|
return c.handleBegin(f)
|
|
case frame.ABORT:
|
|
return c.handleAbort(f)
|
|
case frame.COMMIT:
|
|
return c.handleCommit(f)
|
|
case frame.SEND:
|
|
return c.handleSend(f)
|
|
case frame.SUBSCRIBE:
|
|
return c.handleSubscribe(f)
|
|
case frame.UNSUBSCRIBE:
|
|
return c.handleUnsubscribe(f)
|
|
case frame.ACK:
|
|
return c.handleAck(f)
|
|
case frame.NACK:
|
|
return c.handleNack(f)
|
|
case frame.MESSAGE, frame.RECEIPT, frame.ERROR:
|
|
// should only be sent by the server, should not come from the client
|
|
return unexpectedCommand
|
|
}
|
|
return unknownCommand
|
|
}
|
|
|
|
func (c *Conn) handleConnect(f *frame.Frame) error {
|
|
var err error
|
|
|
|
if _, ok := f.Header.Contains(frame.Receipt); ok {
|
|
// CONNNECT and STOMP frames are not allowed to have
|
|
// a receipt header.
|
|
return receiptInConnect
|
|
}
|
|
|
|
// if either of these fields are absent, pass nil to the
|
|
// authenticator function.
|
|
login, _ := f.Header.Contains(frame.Login)
|
|
passcode, _ := f.Header.Contains(frame.Passcode)
|
|
if !c.config.Authenticate(login, passcode) {
|
|
// sleep to slow down a rogue client a little bit
|
|
c.log.Error("authentication failed")
|
|
time.Sleep(time.Second)
|
|
return authenticationFailed
|
|
}
|
|
|
|
c.version, err = determineVersion(f)
|
|
if err != nil {
|
|
c.log.Error("protocol version negotiation failed")
|
|
return err
|
|
}
|
|
c.validator = stomp.NewValidator(c.version)
|
|
|
|
if c.version == stomp.V10 {
|
|
// don't want to handle V1.0 at the moment
|
|
// TODO: get working for V1.0
|
|
c.log.Errorf("unsupported version %s", c.version)
|
|
return unsupportedVersion
|
|
}
|
|
|
|
cx, cy, err := getHeartBeat(f)
|
|
if err != nil {
|
|
c.log.Error("invalid heart-beat")
|
|
return err
|
|
}
|
|
|
|
// Minimum value as per server config. If the client
|
|
// has requested shorter periods than this value, the
|
|
// server will insist on the longer time period.
|
|
min := asMilliseconds(c.config.HeartBeat(), maxHeartBeat)
|
|
|
|
// apply a minimum heartbeat
|
|
if cx > 0 && cx < min {
|
|
cx = min
|
|
}
|
|
if cy > 0 && cy < min {
|
|
cy = min
|
|
}
|
|
|
|
// the read timeout has already been processed in the readLoop
|
|
// go-routine
|
|
c.writeTimeout = time.Duration(cy) * time.Millisecond
|
|
|
|
/* TR-369 section 4.4.1.1 [Connecting a USP Endpoint to the STOMP Server] */
|
|
/*
|
|
R-STOMP.4: USP Endpoints sending a STOMP frame MUST include (in addition to other
|
|
mandatory STOMP headers) an endpoint-id STOMP header containing the
|
|
Endpoint ID of the USP Endpoint sending the frame.
|
|
*/
|
|
endpointId := f.Header.Get("endpoint-id")
|
|
|
|
response := frame.New(frame.CONNECTED,
|
|
frame.Version, string(c.version),
|
|
frame.Server, "stompd/1.2",
|
|
frame.HeartBeat, fmt.Sprintf("%d,%d", cy, cx),
|
|
frame.SubscribeDest, "oktopus/usp/v1/agent/"+endpointId,
|
|
)
|
|
|
|
c.sendImmediately(response)
|
|
c.stateFunc = connected
|
|
|
|
// tell the upper layer we are connected
|
|
c.requestChannel <- Request{Op: ConnectedOp, Conn: c}
|
|
|
|
return nil
|
|
}
|
|
|
|
// Sends a RECEIPT frame to the client if the frame f contains
|
|
// a receipt header. If the frame does contain a receipt header,
|
|
// it will be removed from the frame.
|
|
func (c *Conn) sendReceiptImmediately(f *frame.Frame) error {
|
|
if receipt, ok := f.Header.Contains(frame.Receipt); ok {
|
|
// Remove the receipt header from the frame. This is handy
|
|
// for transactions, because the frame has its receipt
|
|
// header removed prior to entering the transaction store.
|
|
// When the frame is processed upon transaction commit, it
|
|
// will not have a receipt header anymore.
|
|
f.Header.Del(frame.Receipt)
|
|
return c.sendImmediately(frame.New(frame.RECEIPT,
|
|
frame.ReceiptId, receipt))
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (c *Conn) handleDisconnect(f *frame.Frame) error {
|
|
// As soon as we receive a DISCONNECT frame from a client, we do
|
|
// not want to send any more frames to that client, with the exception
|
|
// of a RECEIPT frame if the client has requested one.
|
|
// Ignore the error condition if we cannot send a RECEIPT frame,
|
|
// as the connection is about to close anyway.
|
|
_ = c.sendReceiptImmediately(f)
|
|
return nil
|
|
}
|
|
|
|
func (c *Conn) handleBegin(f *frame.Frame) error {
|
|
// the frame should already have been validated for the
|
|
// transaction header, but we check again here.
|
|
if transaction, ok := f.Header.Contains(frame.Transaction); ok {
|
|
// Send a receipt and remove the header
|
|
err := c.sendReceiptImmediately(f)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
return c.txStore.Begin(transaction)
|
|
}
|
|
return missingHeader(frame.Transaction)
|
|
}
|
|
|
|
func (c *Conn) handleCommit(f *frame.Frame) error {
|
|
// the frame should already have been validated for the
|
|
// transaction header, but we check again here.
|
|
if transaction, ok := f.Header.Contains(frame.Transaction); ok {
|
|
// Send a receipt and remove the header
|
|
err := c.sendReceiptImmediately(f)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
return c.txStore.Commit(transaction, func(f *frame.Frame) error {
|
|
// Call the state function (again) for each frame in the
|
|
// transaction. This time each frame is stripped of its transaction
|
|
// header (and its receipt header as well, if it had one).
|
|
return c.stateFunc(c, f)
|
|
})
|
|
}
|
|
return missingHeader(frame.Transaction)
|
|
}
|
|
|
|
func (c *Conn) handleAbort(f *frame.Frame) error {
|
|
// the frame should already have been validated for the
|
|
// transaction header, but we check again here.
|
|
if transaction, ok := f.Header.Contains(frame.Transaction); ok {
|
|
// Send a receipt and remove the header
|
|
err := c.sendReceiptImmediately(f)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
return c.txStore.Abort(transaction)
|
|
}
|
|
return missingHeader(frame.Transaction)
|
|
}
|
|
|
|
func (c *Conn) handleSubscribe(f *frame.Frame) error {
|
|
id, ok := f.Header.Contains(frame.Id)
|
|
if !ok {
|
|
return missingHeader(frame.Id)
|
|
}
|
|
|
|
dest, ok := f.Header.Contains(frame.Destination)
|
|
if !ok {
|
|
return missingHeader(frame.Destination)
|
|
}
|
|
|
|
ack, ok := f.Header.Contains(frame.Ack)
|
|
if !ok {
|
|
ack = frame.AckAuto
|
|
}
|
|
|
|
sub, ok := c.subs[id]
|
|
if ok {
|
|
return subscriptionExists
|
|
}
|
|
|
|
sub = newSubscription(c, dest, id, ack)
|
|
c.subs[id] = sub
|
|
|
|
// send information about new subscription to upper layer
|
|
c.requestChannel <- Request{Op: SubscribeOp, Sub: sub}
|
|
return nil
|
|
}
|
|
|
|
func (c *Conn) handleUnsubscribe(f *frame.Frame) error {
|
|
id, ok := f.Header.Contains(frame.Id)
|
|
if !ok {
|
|
return missingHeader(frame.Id)
|
|
}
|
|
|
|
sub, ok := c.subs[id]
|
|
if !ok {
|
|
return subscriptionNotFound
|
|
}
|
|
|
|
// remove the subscription
|
|
delete(c.subs, id)
|
|
|
|
// tell the upper layer of the unsubscribe
|
|
c.requestChannel <- Request{Op: UnsubscribeOp, Sub: sub}
|
|
return nil
|
|
}
|
|
|
|
func (c *Conn) handleAck(f *frame.Frame) error {
|
|
var err error
|
|
var msgId string
|
|
|
|
if ack, ok := f.Header.Contains(frame.Ack); ok {
|
|
msgId = ack
|
|
} else if msgId, ok = f.Header.Contains(frame.MessageId); !ok {
|
|
return missingHeader(frame.MessageId)
|
|
}
|
|
|
|
// expecting message id to be a uint64
|
|
msgId64, err := strconv.ParseUint(msgId, 10, 64)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// Send a receipt and remove the header
|
|
err = c.sendReceiptImmediately(f)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
if tx, ok := f.Header.Contains(frame.Transaction); ok {
|
|
// the transaction header is removed from the frame
|
|
err = c.txStore.Add(tx, f)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
} else {
|
|
// handle any subscriptions that are acknowledged by this msg
|
|
c.subList.Ack(msgId64, func(s *Subscription) {
|
|
// remove frame from the subscription, it has been delivered
|
|
s.frame = nil
|
|
|
|
// let the upper layer know that this subscription
|
|
// is ready for another frame
|
|
c.requestChannel <- Request{Op: SubscribeOp, Sub: s}
|
|
})
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (c *Conn) handleNack(f *frame.Frame) error {
|
|
var err error
|
|
var msgId string
|
|
|
|
if ack, ok := f.Header.Contains(frame.Ack); ok {
|
|
msgId = ack
|
|
} else if msgId, ok = f.Header.Contains(frame.MessageId); !ok {
|
|
return missingHeader(frame.MessageId)
|
|
}
|
|
|
|
// expecting message id to be a uint64
|
|
msgId64, err := strconv.ParseUint(msgId, 10, 64)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// Send a receipt and remove the header
|
|
err = c.sendReceiptImmediately(f)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
if tx, ok := f.Header.Contains(frame.Transaction); ok {
|
|
// the transaction header is removed from the frame
|
|
err = c.txStore.Add(tx, f)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
} else {
|
|
// handle any subscriptions that are acknowledged by this msg
|
|
c.subList.Nack(msgId64, func(s *Subscription) {
|
|
// send frame back to upper layer for requeue
|
|
c.requestChannel <- Request{Op: RequeueOp, Frame: s.frame}
|
|
|
|
// remove frame from the subscription, it has been requeued
|
|
s.frame = nil
|
|
|
|
// let the upper layer know that this subscription
|
|
// is ready for another frame
|
|
c.requestChannel <- Request{Op: SubscribeOp, Sub: s}
|
|
})
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// Handle a SEND frame received from the client. Note that
|
|
// this method is called after a SEND message is received,
|
|
// but also after a transaction commit.
|
|
func (c *Conn) handleSend(f *frame.Frame) error {
|
|
// Send a receipt and remove the header
|
|
err := c.sendReceiptImmediately(f)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
if tx, ok := f.Header.Contains(frame.Transaction); ok {
|
|
// the transaction header is removed from the frame
|
|
err = c.txStore.Add(tx, f)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
} else {
|
|
// not in a transaction
|
|
// change from SEND to MESSAGE
|
|
f.Command = frame.MESSAGE
|
|
c.requestChannel <- Request{Op: EnqueueOp, Frame: f}
|
|
}
|
|
|
|
return nil
|
|
}
|