sessions

package
v0.0.0-...-ce34cbf Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 18, 2019 License: Apache-2.0 Imports: 8 Imported by: 4

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrSessionsProviderNotFound = errors.New("Session: Session provider not found")
	ErrKeyNotAvailable          = errors.New("Session: not item found for key.")
)

Functions

func Register

func Register(name string, provider Constructor)

Register makes a session provider available by the provided name. If a Register is called twice with the same name or if the driver is nil, it panics.

func Unregister

func Unregister(name string)

Types

type Ackqueue

type Ackqueue struct {
	Size  int64
	Mask  int64
	Count int64
	Head  int64
	Tail  int64

	Ping ackmsg
	Ring []ackmsg
	Emap map[string]int64

	Ackdone []ackmsg
	// contains filtered or unexported fields
}

Ackqueue is a growing queue implemented based on a ring buffer. As the buffer gets full, it will auto-grow.

Ackqueue is used to store messages that are waiting for acks to come back. There are a few scenarios in which acks are required.

  1. Client sends SUBSCRIBE message to server, waits for SUBACK.
  2. Client sends UNSUBSCRIBE message to server, waits for UNSUBACK.
  3. Client sends PUBLISH QoS 1 message to server, waits for PUBACK.
  4. Server sends PUBLISH QoS 1 message to client, waits for PUBACK.
  5. Client sends PUBLISH QoS 2 message to server, waits for PUBREC.
  6. Server sends PUBREC message to client, waits for PUBREL.
  7. Client sends PUBREL message to server, waits for PUBCOMP.
  8. Server sends PUBLISH QoS 2 message to client, waits for PUBREC.
  9. Client sends PUBREC message to server, waits for PUBREL.
  10. Server sends PUBREL message to client, waits for PUBCOMP.
  11. Client sends PINGREQ message to server, waits for PINGRESP.

func (*Ackqueue) Ack

func (this *Ackqueue) Ack(msg message.Message) error

Ack() takes the ack message supplied and updates the status of messages waiting.

func (*Ackqueue) Acked

func (this *Ackqueue) Acked() []ackmsg

Acked() returns the list of messages that have completed the ack cycle.

func (*Ackqueue) Wait

func (this *Ackqueue) Wait(msg message.Message, onComplete interface{}) error

Wait() copies the message into a waiting queue, and waits for the corresponding ack message to be received.

type Constructor

type Constructor func(context fmt.Stringer) SessionsProvider

type Manager

type Manager struct {
	// contains filtered or unexported fields
}

func NewManager

func NewManager(providerName string, context fmt.Stringer) (*Manager, error)

func (*Manager) Close

func (this *Manager) Close() error

func (*Manager) Count

func (this *Manager) Count() int

func (*Manager) Del

func (this *Manager) Del(id string)

func (*Manager) Get

func (this *Manager) Get(id string) (*Session, error)

func (*Manager) New

func (this *Manager) New(id string) (*Session, error)

func (*Manager) Save

func (this *Manager) Save(id string, session *Session) error

type Session

type Session struct {
	// Ack queue for outgoing PUBLISH QoS 1 messages
	Pub1ack *Ackqueue

	// Ack queue for incoming PUBLISH QoS 2 messages
	Pub2in *Ackqueue

	// Ack queue for outgoing PUBLISH QoS 2 messages
	Pub2out *Ackqueue

	// Ack queue for outgoing SUBSCRIBE messages
	Suback *Ackqueue

	// Ack queue for outgoing UNSUBSCRIBE messages
	Unsuback *Ackqueue

	// Ack queue for outgoing PINGREQ messages
	Pingack *Ackqueue

	// cmsg is the CONNECT message
	Cmsg *message.ConnectMessage `json:"-"`

	// Will message to publish if connect is closed unexpectedly
	Will *message.PublishMessage

	// Retained publish message
	Retained *message.PublishMessage

	Id string

	SessionTopics `json:"-"`
	// contains filtered or unexported fields
}

func (*Session) ID

func (this *Session) ID() string

func (*Session) Init

func (this *Session) Init(msg *message.ConnectMessage) error

func (*Session) RetainMessage

func (this *Session) RetainMessage(msg *message.PublishMessage) error

func (*Session) Update

func (this *Session) Update(msg *message.ConnectMessage) error

type SessionTopics

type SessionTopics interface {
	InitTopics(msg *message.ConnectMessage) error
	AddTopic(topic string, qos byte) error
	RemoveTopic(topic string) error
	Topics() ([]string, []byte, error)
}

type SessionsProvider

type SessionsProvider interface {
	New(id string) (*Session, error)
	Get(id string) (*Session, error)
	Del(id string)
	Save(id string, session *Session) error
	Count() int
	Close() error
}

func NewMemProvider

func NewMemProvider(context fmt.Stringer) SessionsProvider

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL