Golang📌常用包📌amqp091-go.txt
"github.com/rabbitmq/amqp091-go"是一个由RabbitMQ核心团队维护的amqp0.9.1的Go客户端。
包内未提供资源的池化和自动重连,需自行实现。
========== ========== ========== ========== ==========
建立连接:
func Dial(url string) (*Connection, error) 等价于 DialTLS(url string, nil)
func DialConfig(url string, config Config) (*Connection, error)
func DialTLS(url string, amqps *tls.Config) (*Connection, error)
func DialTLS_ExternalAuth(url string, amqps *tls.Config) (*Connection, error)
func Open(conn io.ReadWriteCloser, config Config) (*Connection, error)
url参数格式为:scheme://username:password@address[/vhost],scheme为amqp或amqps,例如:amqp://guest:guest@localhost:5672
未指定Config参数时,服务器心跳间隔默认为10秒,初始读取截止时间设置为30秒。
func (c *Connection) Channel() (*Channel, error)
打开一个独特的、可在连接上并发的通道,用于处理大量amqp消息。
如果当前通道的方法出现任何错误,都意味着这个通道失效,应该打开一个新的通道。
========== ========== ========== ========== ==========
生产环境,交换机和队列的声明与绑定应在管理后台统一操作。
func (ch *Channel) ExchangeDeclare(name, kind string, durable, autoDelete, internal, noWait bool, args Table) error
func (ch *Channel) ExchangeDelete(name string, ifUnused, noWait bool) error
func (ch *Channel) QueueDeclare(name string, durable, autoDelete, exclusive, noWait bool, args Table) (Queue, error)
func (ch *Channel) QueueDelete(name string, ifUnused, ifEmpty, noWait bool) (int, error)
func (ch *Channel) QueueBind(name, key, exchange string, noWait bool, args Table) error
func (ch *Channel) QueueUnbind(name, key, exchange string, args Table) error
========== ========== ========== ========== ==========
type Publishing struct {
Headers Table
ContentType string // MIME content type
ContentEncoding string // MIME content encoding
DeliveryMode uint8 // Transient(0/1)暂态,更高的吞吐量,但在重新启动时不会恢复消息。Persistent(2)持久
Priority uint8 // 优先级0~9
CorrelationId string // correlation identifier
ReplyTo string // address to to reply to (ex: RPC)
Expiration string // message expiration spec
MessageId string // 消息ID,需投递时赋值,不会自动赋值
Timestamp time.Time // 消息时间戳,需投递时赋值,否则为UTC0时
Type string // message type name
UserId string // creating user id - ex: "guest"
AppId string // creating application id
Body []byte // 消息体
}
func (ch *Channel) Publish(exchange, key string, mandatory, immediate bool, msg Publishing) error
发送一条消息到交换机
func (ch *Channel) Qos(prefetchCount, prefetchSize int, global bool) error
控制服务器在接收到ack之前将尝试在网络上为消费者保留多少消息或多少字节。
func (ch *Channel) Consume(queue, consumer string, autoAck, exclusive, noLocal, noWait bool, args Table) (<-chan Delivery, error)
func (ch *Channel) ConsumeWithContext(ctx context.Context, queue, consumer string, autoAck, exclusive, noLocal, noWait bool, args Table) (<-chan Delivery, error)
立即开始消费消息,消费者标识consumer应在信道上确保唯一,并且在需要停止获取消息调用ch.Cancel方法时传入该消费者标识。
consumer为空时,程序包会使用原子值生成进程内唯一标识。传入context的方法,当监听context取消时会自动对该消费者调用ch.Cancel方法。
手动调用ch.Cancel方法,或连接/信道失效(主动关闭或被动断开)时,Delivery单向通道会被关闭。
func (ch *Channel) Cancel(consumer string, noWait bool) error
停止向消费者交付新的消息。
========== ========== ========== ========== ==========
type Delivery struct {
Acknowledger Acknowledger // the channel from which this delivery arrived
ConsumerTag string // Valid only with Channel.Consume
MessageCount uint32 // Valid only with Channel.Get
DeliveryTag uint64
Redelivered bool // 首次消费时为false,requeue后为true
Exchange string // basic.publish exchange
RoutingKey string // basic.publish routing key
// ... 更多字段同Publishing结构,值源于投递时赋值。
}
消费者的autoAck为false时手动确认消息,当autoAck为true时不应调用以下方法:
func (d Delivery) Ack(multiple bool) error
确认一条消息
func (d Delivery) Nack(multiple, requeue bool) error
func (d Delivery) Reject(requeue bool) error
否定确认消息