Pulsar 消息概念1
admin
2023-01-31 14:27:02
0

Pulsar官方文档 概念和架构-Messaging Concepts中主要内容

1 消息组成

组成 说明
Value / data payload 消息携带的数据,所有pulsar的消息携带原始bytes,但是消息数据也需要遵循数据shcema
Key 消息可以被Key打标签。这可以对topic压缩之类的事情起作用
Properties 用户定义属性的可选键/值映射
Producer name 生成消息的生产者的名称(生产者自动被赋予默认名称,但您也可以显式地应用自己的名称)
Sequence ID 每一个消息在其主题上都属于一个有序序列。消息的序列ID是它在该序列中的顺序
Publish time 消息发布时的时间戳(由生产者自动应用)
Event time 一个可选的时间戳,应用程序可以附加到消息上,表示某个事件发生的时间,例如消息被处理的时间。如果未显式设置,则消息的事件时间为0。

2 生产者 发送模式

模式 说明
同步发送 生产者将在发送每个消息后等待代理的确认。如果未收到确认,则生产者将认为发送操作失败
异步发送 生产者将把消息放入阻塞队列并立即返回。客户端库随后将消息发送到后台的代理。如果队列已满(最大大小可配置,则在调用API时,生产者可能会被阻止或立即失败,具体取决于传递给生产者的参数

2.1 消息压缩支持
LZ4
ZLIB
ZSTD
SNAPPY

2.2 支持批处理
如果启用批处理,则生产者将在单个请求中累积并发送一批消息。批处理大小是由消息的最大数量和最大发布延迟定义的。
3 消费者 接收模式

模式 说明
同步接收 同步接收将被阻止,直到有消息可用
异步接收 异步接收将立即返回一个future值,例如Java中的CompletableFuture,该值在新消息可用时完成

3.1 监听
客户端库为用户提供侦听器实现。例如,Java客户机提供了一个MessageListener接口。在这个接口中,只要接收到新消息,就会调用received方法。
3.2 确认
当使用者成功使用消息时,使用者会向代理发送确认请求,以便代理丢弃该消息。否则,它将存储消息。
消息可以逐个确认,也可以累积确认。对于累积确认,消费者只需要确认它收到的最后一条消息。流中直至(包括)所提供消息的所有消息将不会重新传递给该使用者
( 累积确认不能与共享订阅模式一起使用,因为共享模式涉及多个对同一订阅具有访问权限的使用者)
在共享订阅模式下,可以单独确认消息。
3.3 否定确认
当使用者一次未成功使用消息,并且希望再次使用该消息时,使用者可以向代理发送否定的确认,然后代理将重新传递该消息。

消息可以被一个接一个地否定或累积地承认,这取决于消费订阅模式。
在独占订阅模式和故障转移订阅模式中,消费者只会消极地确认他们收到的最后一条消息。

在“共享”和“密钥共享”订阅模式中,您可以分别对消息进行否定性确认

3.4 确认超时
当消息未成功使用,并且您希望触发代理自动重新传递消息时,可以采用未确认消息自动重新传递机制。客户端将在整个确认超时时间范围内跟踪未确认消息,并在指定确认超时时自动向代理发送重新传递未确认消息请求
注意
在确认超时之前使用否定确认。否定确认以更精确的方式控制单个消息的重新传递,并在消息处理时间超过确认超时时避免无效的重新传递。
4 死信主题
在某些消息无法由消费者使用时,会成生新的消息。在这种机制中,无法使用的消息存储在一个单独的主题中,称为死信主题。您可以决定如何处理死信主题中的消息
下面的示例演示如何使用默认的死信主题在Java客户机中启用死信主题

Consumer consumer = pulsarClient.newConsumer(Schema.BYTES)
.topic(topic)
.subscriptionName("my-subscription")
.subscriptionType(SubscriptionType.Shared)
.deadLetterPolicy(DeadLetterPolicy.builder()
.maxRedeliverCount(maxRedeliveryCount)
.build())
.subscribe();
默认的死信主题使用以下格式:
--DLQ
如果要指定死信主题的名称,请使用以下Java客户端示例:
Consumer consumer = pulsarClient.newConsumer(Schema.BYTES)
.topic(topic)
.subscriptionName("my-subscription")
.subscriptionType(SubscriptionType.Shared)
.deadLetterPolicy(DeadLetterPolicy.builder()
.maxRedeliverCount(maxRedeliveryCount)
.deadLetterTopic("your-topic-name")
.build())
.subscribe();
死信主题取决于邮件的重新传递。由于确认超时或否定确认,消息将重新传递。如果要对消息使用否定确认,请确保在确认超时之前对其进行否定确认。
注意
目前,死信主题仅在共享订阅模式下启用

相关内容

热门资讯

德国总理:美国正在被伊朗羞辱 德国之声4月27日报道,德国总理默茨在访问一所学校时表示,在当前的持续冲突中,伊朗领导层正试图羞辱美...
理响中国|“长”歌以行,风云激... 光阴如梭,东方潮阔。这里是中国的长三角,世界的长三角。无论过去、现在还是未来,这片土地都因时代而生,...
白宫:特朗普及其国安团队开会讨... 新华社华盛顿4月27日电 美国白宫新闻秘书莱维特27日在记者会上证实,总统特朗普及其国家安全团队当天...
人民日报刊文:日本放开杀伤性武... 日本放开杀伤性武器出口推高地缘冲突风险(国际论坛)常思纯《人民日报》(2026年04月28日 第 0...
医疗保障法草案二审:明确生育保... 满足多样化健康保障需求本报记者 彭 波4月27日,医疗保障法草案二审稿提请十四届全国人大常委会第二十...
天津一景区发生自转旋翼机事故1... 澎湃新闻记者 吕新文中国民用航空华北地区管理局4月22日公布《豪客通航“10•1”天津长芦汉盐旅游区...
卡塔尔埃米尔与美国总统特朗普通... 当地时间24日,卡塔尔埃米尔塔米姆与美国总统特朗普通电话,重点就中东地区局势以及伊朗与美国谈判问题交...
男子30年前被扣押2859克黄... 澎湃新闻记者 王鑫家住辽宁省大连市的潘永嘉近日向澎湃新闻反映称,三十年前,他在大连周水子机场被盖州市...
商务部:取消反制欧盟两家金融机... 中华人民共和国商务部令二〇二六年 第1号鉴于欧盟已取消对中国两家金融机构的制裁措施,现公布《关于取消...
过去24小时共有5艘船只通过霍... 总台记者当地时间24日获悉,过去24小时内,共有5艘船只通过霍尔木兹海峡,其中包括一艘伊朗油轮。(总...