oktopus/backend/services/mtp/stomp-adapter/internal/stomp/conn_options.go

342 lines
12 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package stomp
import (
"fmt"
"strings"
"time"
"github.com/oktopUSP/oktopus/backend/services/mtp/stomp-adapter/internal/stomp/frame"
"github.com/oktopUSP/oktopus/backend/services/mtp/stomp-adapter/internal/stomp/internal/log"
)
// ConnOptions is an opaque structure used to collection options
// for connecting to the other server.
type connOptions struct {
FrameCommand string
Host string
ReadTimeout time.Duration
WriteTimeout time.Duration
HeartBeatError time.Duration
MsgSendTimeout time.Duration
RcvReceiptTimeout time.Duration
DisconnectReceiptTimeout time.Duration
UnsubscribeReceiptTimeout time.Duration
HeartBeatGracePeriodMultiplier float64
Login, Passcode string
AcceptVersions []string
Header *frame.Header
ReadChannelCapacity, WriteChannelCapacity int
ReadBufferSize, WriteBufferSize int
ResponseHeadersCallback func(*frame.Header)
Logger Logger
}
func newConnOptions(conn *Conn, opts []func(*Conn) error) (*connOptions, error) {
co := &connOptions{
FrameCommand: frame.CONNECT,
ReadTimeout: time.Minute,
WriteTimeout: time.Minute,
HeartBeatGracePeriodMultiplier: 1.0,
HeartBeatError: DefaultHeartBeatError,
MsgSendTimeout: DefaultMsgSendTimeout,
RcvReceiptTimeout: DefaultRcvReceiptTimeout,
DisconnectReceiptTimeout: DefaultDisconnectReceiptTimeout,
UnsubscribeReceiptTimeout: DefaultUnsubscribeReceiptTimeout,
Logger: log.StdLogger{},
}
// This is a slight of hand, attach the options to the Conn long
// enough to run the options functions and then detach again.
// The reason we do this is to allow for future options to be able
// to modify the Conn object itself, in case that becomes desirable.
conn.options = co
defer func() { conn.options = nil }()
// compatibility with previous version: ignore nil options
for _, opt := range opts {
if opt != nil {
err := opt(conn)
if err != nil {
return nil, err
}
}
}
if len(co.AcceptVersions) == 0 {
co.AcceptVersions = append(co.AcceptVersions, string(V10), string(V11), string(V12))
}
return co, nil
}
func (co *connOptions) NewFrame() (*frame.Frame, error) {
f := frame.New(co.FrameCommand)
if co.Host != "" {
f.Header.Set(frame.Host, co.Host)
}
// heart-beat
{
send := co.WriteTimeout / time.Millisecond
recv := co.ReadTimeout / time.Millisecond
f.Header.Set(frame.HeartBeat, fmt.Sprintf("%d,%d", send, recv))
}
// login, passcode
if co.Login != "" || co.Passcode != "" {
f.Header.Set(frame.Login, co.Login)
f.Header.Set(frame.Passcode, co.Passcode)
}
// accept-version
f.Header.Set(frame.AcceptVersion, strings.Join(co.AcceptVersions, ","))
// custom header entries -- note that these do not override
// header values already set as they are added to the end of
// the header array
f.Header.AddHeader(co.Header)
return f, nil
}
// Options for connecting to the STOMP server. Used with the
// stomp.Dial and stomp.Connect functions, both of which have examples.
var ConnOpt struct {
// Login is a connect option that allows the calling program to
// specify the "login" and "passcode" values to send to the STOMP
// server.
Login func(login, passcode string) func(*Conn) error
// Host is a connect option that allows the calling program to
// specify the value of the "host" header.
Host func(host string) func(*Conn) error
// UseStomp is a connect option that specifies that the client
// should use the "STOMP" command instead of the "CONNECT" command.
// Note that using "STOMP" is only valid for STOMP version 1.1 and later.
UseStomp func(*Conn) error
// AcceptVersoin is a connect option that allows the client to
// specify one or more versions of the STOMP protocol that the
// client program is prepared to accept. If this option is not
// specified, the client program will accept any of STOMP versions
// 1.0, 1.1 or 1.2.
AcceptVersion func(versions ...Version) func(*Conn) error
// HeartBeat is a connect option that allows the client to specify
// the send and receive timeouts for the STOMP heartbeat negotiation mechanism.
// The sendTimeout parameter specifies the maximum amount of time
// between the client sending heartbeat notifications from the server.
// The recvTimeout paramter specifies the minimum amount of time between
// the client expecting to receive heartbeat notifications from the server.
// If not specified, this option defaults to one minute for both send and receive
// timeouts.
HeartBeat func(sendTimeout, recvTimeout time.Duration) func(*Conn) error
// HeartBeatError is a connect option that will normally only be specified during
// testing. It specifies a short time duration that is larger than the amount of time
// that will take for a STOMP frame to be transmitted from one station to the other.
// When not specified, this value defaults to 5 seconds. This value is set to a much
// shorter time duration during unit testing.
HeartBeatError func(errorTimeout time.Duration) func(*Conn) error
// MsgSendTimeout is a connect option that allows the client to specify
// the timeout for the Conn.Send function.
// The msgSendTimeout parameter specifies maximum blocking time for calling
// the Conn.Send function.
// If not specified, this option defaults to 10 seconds.
// Less than or equal to zero means infinite
MsgSendTimeout func(msgSendTimeout time.Duration) func(*Conn) error
// RcvReceiptTimeout is a connect option that allows the client to specify
// how long to wait for a receipt in the Conn.Send function. This helps
// avoid deadlocks. If this is not specified, the default is 30 seconds.
RcvReceiptTimeout func(rcvReceiptTimeout time.Duration) func(*Conn) error
// DisconnectReceiptTimeout is a connect option that allows the client to specify
// how long to wait for a receipt in the Conn.Disconnect function. This helps
// avoid deadlocks. If this is not specified, the default is 30 seconds.
DisconnectReceiptTimeout func(disconnectReceiptTimeout time.Duration) func(*Conn) error
// UnsubscribeReceiptTimeout is a connect option that allows the client to specify
// how long to wait for a receipt in the Conn.Unsubscribe function. This helps
// avoid deadlocks. If this is not specified, the default is 30 seconds.
UnsubscribeReceiptTimeout func(unsubscribeReceiptTimeout time.Duration) func(*Conn) error
// HeartBeatGracePeriodMultiplier is used to calculate the effective read heart-beat timeout
// the broker will enforce for each clients connection. The multiplier is applied to
// the read-timeout interval the client specifies in its CONNECT frame
HeartBeatGracePeriodMultiplier func(multiplier float64) func(*Conn) error
// Header is a connect option that allows the client to specify a custom
// header entry in the STOMP frame. This connect option can be specified
// multiple times for multiple custom headers.
Header func(key, value string) func(*Conn) error
// ReadChannelCapacity is the number of messages that can be on the read channel at the
// same time. A high number may affect memory usage while a too low number may lock the
// system up. Default is set to 20.
ReadChannelCapacity func(capacity int) func(*Conn) error
// WriteChannelCapacity is the number of messages that can be on the write channel at the
// same time. A high number may affect memory usage while a too low number may lock the
// system up. Default is set to 20.
WriteChannelCapacity func(capacity int) func(*Conn) error
// ReadBufferSize specifies number of bytes that can be used to read the message
// A high number may affect memory usage while a too low number may lock the
// system up. Default is set to 4096.
ReadBufferSize func(size int) func(*Conn) error
// WriteBufferSize specifies number of bytes that can be used to write the message
// A high number may affect memory usage while a too low number may lock the
// system up. Default is set to 4096.
WriteBufferSize func(size int) func(*Conn) error
// ResponseHeaders lets you provide a callback function to get the headers from the CONNECT response
ResponseHeaders func(func(*frame.Header)) func(*Conn) error
// Logger lets you provide a callback function that sets the logger used by a connection
Logger func(logger Logger) func(*Conn) error
}
func init() {
ConnOpt.Login = func(login, passcode string) func(*Conn) error {
return func(c *Conn) error {
c.options.Login = login
c.options.Passcode = passcode
return nil
}
}
ConnOpt.Host = func(host string) func(*Conn) error {
return func(c *Conn) error {
c.options.Host = host
return nil
}
}
ConnOpt.UseStomp = func(c *Conn) error {
c.options.FrameCommand = frame.STOMP
return nil
}
ConnOpt.AcceptVersion = func(versions ...Version) func(*Conn) error {
return func(c *Conn) error {
for _, version := range versions {
if err := version.CheckSupported(); err != nil {
return err
}
c.options.AcceptVersions = append(c.options.AcceptVersions, string(version))
}
return nil
}
}
ConnOpt.HeartBeat = func(sendTimeout, recvTimeout time.Duration) func(*Conn) error {
return func(c *Conn) error {
c.options.WriteTimeout = sendTimeout
c.options.ReadTimeout = recvTimeout
return nil
}
}
ConnOpt.HeartBeatError = func(errorTimeout time.Duration) func(*Conn) error {
return func(c *Conn) error {
c.options.HeartBeatError = errorTimeout
return nil
}
}
ConnOpt.MsgSendTimeout = func(msgSendTimeout time.Duration) func(*Conn) error {
return func(c *Conn) error {
c.options.MsgSendTimeout = msgSendTimeout
return nil
}
}
ConnOpt.RcvReceiptTimeout = func(rcvReceiptTimeout time.Duration) func(*Conn) error {
return func(c *Conn) error {
c.options.RcvReceiptTimeout = rcvReceiptTimeout
return nil
}
}
ConnOpt.DisconnectReceiptTimeout = func(disconnectReceiptTimeout time.Duration) func(*Conn) error {
return func(c *Conn) error {
c.options.DisconnectReceiptTimeout = disconnectReceiptTimeout
return nil
}
}
ConnOpt.UnsubscribeReceiptTimeout = func(unsubscribeReceiptTimeout time.Duration) func(*Conn) error {
return func(c *Conn) error {
c.options.UnsubscribeReceiptTimeout = unsubscribeReceiptTimeout
return nil
}
}
ConnOpt.HeartBeatGracePeriodMultiplier = func(multiplier float64) func(*Conn) error {
return func(c *Conn) error {
c.options.HeartBeatGracePeriodMultiplier = multiplier
return nil
}
}
ConnOpt.Header = func(key, value string) func(*Conn) error {
return func(c *Conn) error {
if c.options.Header == nil {
c.options.Header = frame.NewHeader(key, value)
} else {
c.options.Header.Add(key, value)
}
return nil
}
}
ConnOpt.ReadChannelCapacity = func(capacity int) func(*Conn) error {
return func(c *Conn) error {
c.options.ReadChannelCapacity = capacity
return nil
}
}
ConnOpt.WriteChannelCapacity = func(capacity int) func(*Conn) error {
return func(c *Conn) error {
c.options.WriteChannelCapacity = capacity
return nil
}
}
ConnOpt.ReadBufferSize = func(size int) func(*Conn) error {
return func(c *Conn) error {
c.options.ReadBufferSize = size
return nil
}
}
ConnOpt.WriteBufferSize = func(size int) func(*Conn) error {
return func(c *Conn) error {
c.options.WriteBufferSize = size
return nil
}
}
ConnOpt.ResponseHeaders = func(callback func(*frame.Header)) func(*Conn) error {
return func(c *Conn) error {
c.options.ResponseHeadersCallback = callback
return nil
}
}
ConnOpt.Logger = func(log Logger) func(*Conn) error {
return func(c *Conn) error {
if log != nil {
c.options.Logger = log
}
return nil
}
}
}