Apache Pulsar 初级使用

分享 123456789987654321 ⋅ 于 2022-07-26 15:47:55 ⋅ 1371 阅读

Apache Pulsar

1.Apache Pulsar基本介绍

Apache Pulsar 是一个云原生企业级的发布订阅(pub-sub)消息系统,最初由Yahoo开发,并于2016年底开源,现在是 Apache软件基金会顶级开源项目。Pulsar在Yahoo的生产环境运行了三年多,助力Yahoo的主要应用,如Yahoo Mail、 Yahoo Finance、Yahoo Sports、Flickr、Gemini广告平台和Yahoo分布式键值存储系统Sherpa。

2.Apache Pulsar的功能与特性:

1) 多租户模式: 
2) 灵活的消息系统 
3) 云原生架构 
4) segmented Sreams(分片流) 
5) 支持跨地域复制

2.1多租户模式

租户和命名空间(namespace)是 Pulsar 支持多租户的两个核心概念。 
在租户级别,Pulsar 为特定的租户预留合适的存储空间、应用授权与认证机制。 
在命名空间级别,Pulsar 有一系列的配置策略(policy),包括存储配额、流控、消息过期策略和命名空间之间的隔离策略

file

2.2灵活的消息系统

    1.Pulsar 做了`队列模型`和`流模型`的统一,在 Topic 级别只需保存一份数据,同一份数据可多次消费。以流式、 队列等方式计算不同的订阅模型大大提升了灵活度。 
    2.同时pulsar通过事务采用Exactly-Once(精准一次)在进行消息传输过程中, 可以确保数据不丢不重

2.3云原生架构

Pulsar 使用计算与存储分离的云原生架构,数据从 Broker 搬离,存在共享存储内部。上层是无状态 Broker ,复制消息分发和服务;下层是持久化的存储层 Bookie 集群。`Pulsar 存储是分片的`,这种构架可以`避免扩容时受限制`,实现数据的独立扩展和快速恢复

file

2.4Segmented Streams(分片流)

Pulsar 将无界的数据看作是分片的流,分片分散存储在分层存储(tiered storage)、BookKeeper 集群和 Broker 节点上,而对外提供一个统一的、无界数据的视图。其次,不需要用户显式迁移数据,减少存储成本并 保持近似无限的存储。

file

2.5支持跨地域复制

Pulsar 中的跨地域复制是将 Pulsar 中持久化的消息在多个集群间备份。在 Pulsar 2.4.0 中新增了复制订阅 模式(Replicated-subscriptions),在某个集群失效情况下,该功能可以在其他集群恢复消费者的消费状态, 从而达到热备模式下消息服务的高可用。

file

2.6层级存储

Infinite Stream: 以流的方式永久保存原始数据 
分区的容量不再受限制 
充分利⽤云存储或现有的廉价存储 ( 例如 HDFS) 
数据统⼀表征:客户端无需关心数据究竟存储在哪⾥

file

2.7Pulsar IO(Connector) 连接器

1.Pulsar IO 分为输入(Input)和输出(Output)两个模块,输入代表数据从哪里来,通过 Source 实现数据输入。输出   代表数据要往哪里去,通过Sink实现数据输出。 
2.Pulsar提出了Connector(也称为 Pulsar IO),用于解决 Pulsar 与周边系统的集成问题,帮助用户高效完成工作。 
3.目前 pulsar IO 支持非常多的连接集成操作: 例如HDFS 、Spark、Flink 、Flume 、ES 、HBase等

file

2.8Pulsar Funcations(轻量级计算框架)

1.Pulsar Functions 是一个轻量级的计算框架,可以给用户提供一个部署简单、运维简单、API 简单的 FASS( Function as a service)平台。Pulsar Functions 提供基于事件的服务,支持有状态与无状态的多语言计算,是 对复杂的大数据处理框架的有力补充。 
2.Pulsar Functions 的设计灵感来自于 Heron 这样的流处理引擎,Pulsar Functions 将会拓展 Pulsar 和整个消 息领域的未来。使用 Pulsar Functions,用户可以轻松地部署和管理 function,通过 function 从 Pulsar topic 读取数据或者生产新数据到 Pulsar topic。

file

3.Pulsar与kafka的对比

1) 模型概念 
    Kafka:  producer – topic – consumer group – consumer 
    Pulsar: producer – topic -subsciption- consumer 
2) 消息消费模式 
    Kafka: 主要集中在流(Stream) 模式, 对单个partition是独占消费, 没有共享(Queue)的消费模式 
    Pulsar: 提供了统一的消息模型和API. 流(Stream) 模式 – 独占和故障切换订阅方式 ; 队列(Queue)模式 – 共享订    阅的方式 
3) 消息确认(ack)
    Kafka: 使用偏移量 offset 
    Pulsar: 使用专门的cursor管理. 累积确认和kafka效果一样; 提供单条或选择性确认 
4) 消息保留: 
    Kafka: 根据设置的保留期来删除消息, 有可能消息没被消费, 过期后被删除, 不支持TTL 
    Pulsar: 消息只有被所有订阅消费后才会删除, 不会丢失数据,. 也运行设置保留期, 保留被消费的数据 . 支持TTL
Apache Kafka和Apache Pulsar都有类似的消息概念。 客户端通过主题与消息系统进行交互。 每个主题都可以分为多 个分区。 然而,Apache Pulsar和Apache Kafka之间的根本区别在于Apache` Kafka是以分区为存储中心`,而Apache `Pulsar是以Segment为存储中心`。

file

3.1kafka目前存在的痛点

1) Kafka 很难进行扩展,因为 Kafka 把消息持久化在 broker 中,迁移主题分区时,需要把分区的数据完全复制到其他 broker 中,这个操作非常耗时。 
2) 当需要通过更改分区大小以获得更多的存储空间时,会与消息索引产生冲突,打乱消息顺序。因此,如果用户需要保证消息的顺序,Kafka 就变得非常棘手了。 
3) 如果分区副本不处于 ISR(同步)状态,那么 leader 选取可能会紊乱。一般地,当原始主分区出现故障时,应该有一个 ISR 副本被征用,但是这点并不能完全保证。若在设置中并未规定只有 ISR 副本可被选为 leader 时,选出一个处于非同步状态的副本做 leader,这比没有 broker 服务该 partition 的情况更糟糕。 
4) 使用 Kafka 时,你需要根据现有的情况并充分考虑未来的增量计划,规划 broker、主题、分区和副本的数量,才能避免 Kafka 扩展导致的问题。这是 理想状况,实际情况很难规划,不可避免会出现扩展需求。 
5) Kafka 集群的分区再均衡会影响相关生产者和消费者的性能。 
6) 发生故障时,Kafka 主题无法保证消息的完整性(特别是遇到第 3 点中的情况,需要扩展时极有可能丢失消息)。 
7) 使用 Kafka 需要和 offset 打交道,这点让人很头痛,因为 broker 并不维护 consumer 的消费状态。 
8) 如果使用率很高,则必须尽快删除旧消息,否则就会出现磁盘空间不够用的问题。 
9) 众所周知,Kafka 原生的跨地域复制机制(MirrorMaker)有问题,即使只在两个数据中心也无法正常使用跨地域复制。因此,甚至 Uber 都不得不创建 另一套解决方案来解决这个问题,并将其称为 uReplicator (https://eng.uber.com/ureplicator/)。 
10) 要想进行实时数据分析,就不得不选用第三方工具,如 Apache Storm、Apache Heron 或 Apache Spark。同时,你需要确保这些第三方工具足以支撑传 入的流量。 
11) Kafka 没有原生的多租户功能来实现租户的完全隔离,它是通过使用主题授权等安全功能来完成的。

4.Apache Pulsar集群架构

4.1架构基本介绍

单个 Pulsar 集群由以下三部分组成: 
1.多个 broker 负责处理和负载均衡 producer 发出的消息,并将这些消息分派给 consumer;Broker 与 Pulsar 配 置存储交互来处理相应的任务,并将消息存储在 BookKeeper 实例中(又称 bookies);Broker 依赖 ZooKeeper 集群处理特定的任务,等等。 
2.多个 bookie 的 BookKeeper 集群负责消息的持久化存储。 
3.一个zookeeper集群,用来处理多个Pulsar集群之间的协调任务。

file

4.2Apache Pulsar提供的组件介绍

4.2.1Brokers介绍

Pulsar的broker是一个无状态组件, 主要负责运行另外的两个组件: 
1.一个 HTTP 服务器, 它暴露了 REST 系统管理接口以及在生产者和消费者之间进行 Topic查找的API。 
2.一个调度分发器, 它是异步的TCP服务器,通过自定义二进制协议应用于所有相关的数据传输。

    出于性能考虑,消息通常从Managed Ledger缓存中分派出去,除非积压超过缓存大小。如果积压的消息对于缓存来说太 大了, 则Broker将开始从BookKeeper那里读取Entries(Entry同样是BookKeeper中的概念,相当于一条记录)。 
    最后,为了支持全局Topic异地复制,Broker会控制Replicators追踪本地发布的条目,并把这些条目用Java 客户端重 新发布到其他区域

4.2.2Zookeeper的元数据存储

Pulsar使用Apache Zookeeper进行元数据存储、集群配置和协调 :
    1.`配置存储`: 存储租户,命名域和其他需要全局一致的配置项 
    2.每个集群有自己独立的ZooKeeper保存集群内部配置和协调信息,例如归属信息,broker负载报告,BookKeeper ledger信息(这个是BookKeeper本身所依赖的)等等。

4.2.3基于bookKeeper持久化存储

Apache Pulsar 为应用程序提供有保证的信息传递, 如果消息成功到达broker, 就认为其预期到达了目的地。 

    为了提供这种保证,未确认送达的消息需要持久化存储直到它们被确认送达。这种消息传递模式通常称为持久消息传递 . 在Pulsar内部,所有消息都被保存并同步N份,例如,2个服务器保存四份,每个服务器上面都有镜像的RAID存储。 Pulsar用 Apache bookKeeper作为持久化存储。 bookKeeper是一个分布式的预写日志(WAL)系统,有如下几个特性特 别适合Pulsar的应用场景: 
1) 使`pulsar能够利用独立的日志,称为ledgers`. 可以随着时间的推移为topic创建多个Ledgers 
2) 它为处理顺序消息提供了非常有效的存储 
3) 保证了多系统挂掉时Ledgers的读取一致性 
4) 提供不同的Bookies之间均匀的IO分布的特性 
5) 它在容量和吞吐量方面都具有水平伸缩性。能够通过增加bookies立即增加容量到集群中,并提升吞吐量 
6) Bookies被设计成可以承载数千的并发读写的ledgers。 使用多个磁盘设备 (一个用于日志,另一个用于一般存储) ,这样Bookies可以将读操作的影响和对于写操作的延迟分隔开。

file

Ledger是一个只追加的数据结构,并且只有一个写入器,这个写入器负责多个bookKeeper存储节点(就是Bookies)的写入 。 Ledger的条目会被复制到多个bookies。 Ledgers本身有着非常简单的语义: 

1.Pulsar Broker可以创建ledeger,添加内容到ledger和关闭ledger。 
2.当一个ledger被关闭后,除非明确的要写数据或者是因为写入器挂掉导致ledger关闭,ledger只会以只读模式打开。 
3.最后,当ledger中的条目不再有用的时候,整个ledger可以被删除(ledger分布是跨Bookies的)。

4.3Pulsar 代理

Pulsar客户端和Pulsar集群交互的一种方式就是直连Pulsar brokers。然而,在某些情况下,这种直连既不可行也不可 取,因为客户端并不知道broker的地址。 例如在云环境或者Kubernetes以及其他类似的系统上面运行Pulsar,直连 brokers就基本上不可能了。 

Pulsar proxy 为这个问题提供了一个解决方案, 为所有的broker提供了一个网关, 如果选择运行了Pulsar Proxy. 所有 的客户都会通过这个代理而不是直接与brokers通信

5.Pulsar集群部署及可视化 监控部署

1.local模式

#下载
https://pulsar.apache.org/en/download/
#安装jdk

#启动
cd /export/server/puslar_2.8.1/bin 
./pulsar standalone

#模拟开启消费者监听数据
./pulsar-client consume my-topic -s "first-subscription"

#模拟生产一条数据
./pulsar-client produce my-topic --messages "hello-pulsar“

2.分布式集群模式

2.1集群构建

在第二台和第三台节点上分别配置软连接 
cd /export/server 
ln -s apache-pulsar-2.8.1/ pulsar_2.8.1
# 修改第二台和第三台的broker的地址和bookies地址 
node2: 
cd /export/server/pulsar_2.8.1/conf/ 

vi bookkeeper.conf 
修改其第56行:修改本地ip地址 advertisedAddress=node1.itcast.cn
vi broker.conf 
修改第44行: 更改为本地ip地址 advertisedAddress=node1.itcast.cn 

#第三台节点: 都更改为对应IP地址或者主机名即可

2.2集群启动

2.2.1第一步: 首先启动zookeeper集群
cd /export/server/zookeeper/bin 
./zkServer.sh start 
注意: 三个节点依次都要启动, 启动后 通过 
./zkServer.sh status 
查看状态, 必须看到一个leader 和两个follower 才可以使用
2.2.2第二步: 初始化元数据(此操作, 仅需要初始化一次即可)
首先初始化Pulsar集群元数据: 
cd /export/server/pulsar_2.8.1/bin 
./pulsar initialize-cluster-metadata \ --cluster pulsar-cluster \ 
--zookeeper node1.itcast.cn:2181,node2.itcast.cn:2181,node3.itcast.cn:2181 \ 
--configuration-store node1.itcast.cn:2181,node2.itcast.cn:2181,node3.itcast.cn:2181 \ 
--web-service-url http://node1.itcast.cn:8080,node2.itcast.cn:8080,node3.itcast.cn:8080 \ 
--web-service-url-tls https://node1.itcast.cn:8443,node2.itcast.cn:8443,node3.itcast.cn:8443 \ --broker-service-url pulsar://node1.itcast.cn:6650,node2.itcast.cn:6650,node3.itcast.cn:6650 \ --broker-service-url-tls pulsar+ssl://node1.itcast.cn::6651,node2.itcast.cn:6651,node3.itcast.cn:6651 

接着初始化bookkeeper集群: 若出现提示输入Y/N: 请输入Y 
./bookkeeper shell metaformat
2.2.3第三步: 启动bookkeeper服务
cd /export/server/pulsar_2.8.1/bin 
./pulsar-daemon start bookie 
注意: 三个节点都需要依次启动 验证是否启动: 可三台都检测 

./bookkeeper shell bookiesanity 
提示: Bookie sanity test succeeded 认为启动成功

测试

在pulsar的bin目录下, 专门提供了一个pulsar-client的客户端工具, Pulsar-Clinet工具允许使用者在运行的集群中消费并发送消息到 Pulsar Topic中
#模拟开启消费者监听数据
./pulsar-client consume persistent://public/default/test -s "consumer-test"
#模拟生产一条数据
./pulsar-client produce persistent://public/default/test --messages "hello-pulsar"

3.Apache Pulsar可视化监控部署

3.1第一步: 下载Pulsar-Manager

下载地址: https://dist.apache.org/repos/dist/release/pulsar/pulsar-manager/pulsar-manager-0.2.0/apache-pulsar- manager-0.2.0-bin.tar.gz

3.2第二步: 上传到服务器, 并解压

cd /export/software rz 
上传 apache-pulsar-manager-0.2.0-bin.tar.gz 

解压操作: 
tar -zxf apache-pulsar-manager-0.2.0-bin.tar.gz -C /export/server/ 

cd /export/server/pulsar-manager 
接着再次解压: 
tar -xvf pulsar-manager.tar

3.3第三步: 拷贝dist包到 pulsar-manager目录下并更名为ui

cd /export/server/pulsar-manager/pulsar-manager 
cp -r ../dist ui

3.4第四步: 启动Pulsar

cd /export/server/pulsar-manager/pulsar-manager 
./bin/pulsar-manager

3.5第五步: 初始化超级用户密码

CSRF_TOKEN=$(curl http://node1.itcast.cn:7750/pulsar-manager/csrf-token) 
curl \ 
-H "X-XSRF-TOKEN: $CSRF_TOKEN" \ 
-H "Cookie: XSRF-TOKEN=$CSRF_TOKEN;" \ 
-H 'Content-Type: application/json' \ 
-X PUT http://node1.itcast.cn:7750/pulsar-manager/users/superuser \ 
-d '{"name": "pulsar", "password": "pulsar", "description": "test", "email": "username@test.org"}'

3.6第六步: 访问Pulsar UI

http://node1:7750/ui/index.html
用户名: pulsar 
密码: pulsar

6.Apache Pulsar主要功能介绍及使用

1.什么是多租户

多租户是一种架构,目的是为了让多用户环境下使用同一套程序,且保证用户间数据隔离 
简单讲:在一台服务器上运行单个应用实例,它为多个租户(客户)提供服务
多租户这一特性,使得各个部门之间可以共享同一份 数据,不用单独部署独立的系统来操作数据,很好的保证了各部门间数据一致性的问题,同时简化维护成本。

Pulsar 的多租户设计符合上述要求:
1.使用身份验证、授权和 ACL(访问控制列表)确保其安全性 
2.为每个租户强制执行存储配额 
3.支持在运行时更改隔离机制,从而实现操作成本低和管理简单

2.Pulsar多租户的相关特性_安全性(认证和授权)

一个多租户系统需要在租户内提供系统级别的安全性,细分来讲,主要可以归类为一下两点: 
1.租户只能访问它有权限访问的 topics 
2.不允许访问它无法访问的 topics

在 Pulsar 中,多租户的安全性是通过身份验证和授权机制实现的。当 client 连接到 pulsar broker 时,broker 会使用身份验证插件来验证 此客户端的身份,然后为其分配一个 string 类型的 role token。role token 主要有如下作用: 1. 判断 client 是否有对 topics 进行生产或消费消息的权限 
2. 管理租户属性的配置

Pulsar 目前支持一下几种身份认证, 同时支持自定义实现自己的身份认证程序 
1. TLS 客户端身份认证 
2. 雅虎的身份认证系统: Athenz 
3. Kerberos 
4. JSON Web Token 认证

3.Pulsar多租户的相关特性_隔离性

隔离性主要分为如下两种: 
#软隔离: 通过磁盘配额,流量控制和限制等手段
存储:
    Apache Pulsar 使用Bookkeeper来作为其存储层, bookie是Bookkeeper的实例, Bookkeeper本身就是具有I/O分离(读写分 离)的特性,可以很多的做好IO隔离, 提升读写的效率 
    同时, 不同的租户可以为不同的NameSpace配置不同的存储配额, 当租户内消息的大小达到了存储配额的限制, Pulsar会 采取相应的措施, 例如: 阻止消息生成, 抛异常 或丢弃数据等 
Broker: 
    每个Borker使用的内存资源都是有上限的, 当Broker达到配置的CPU或内存使用的阈值后, Pulsar会迅速的将流量转移到 负载较小的Broker处理 
    在生产和消费方面, Pulsar都可以进行流量控制,租户可以配置发送和接收的速率,避免出现一个客户端占用当前Broker 的所有处理资源

#硬隔离: 物理资源隔离
    Pulsar 允许将某些租户或名称空间与特定 Broker 进行隔离。这可确保这些租户或命名空间可以充分利 用该特定 Broker 上的资源

4.Pulsar多租户的相关操作

1 - 获取租户列表

cd /export/server/brokers/bin 
./pulsar-admin tenants list

2 - 创建租户

cd /export/server/brokers/bin 
./pulsar-admin tenants create my-tenant 在创建租户时,可以使用-r 或者 --admin-roles标志分配管理角色。
可以用逗号分隔的列表指定多个角色。
./pulsar-admin tenants create my-tenant \ 
--admin-roles role1,role2,role3 

./pulsar-admin tenants create my-tenant \ 
-r role1

3 - 获取配置:

pulsar-admin tenants get my-tenant 
{
    "adminRoles": [
        "admin1","admin2" 
    ],
    "allowedClusters": [ 
        "cl1", "cl2"
    ]
}

4 - 更新配置:

cd /export/server/brokers/bin 
./pulsar-admin tenants update my-tenant 
基于update可以更新租户的相关配置信息

5 - 删除租户:

#注意: 在删除的时候, 如果库下已经有名称空间, 是无法删除的,需要先删除名称空间 
cd /export/server/brokers/bin 
./pulsar-admin tenants delete my-tenant

5.Pulsar的名称空间

1.什么是名称空间

    namespace是Pulsar中最基本的管理单元,在namespace这一层面,可以设置权限,调整副本设置,管理跨集群的消息复制,控制 消息策略和执行关键操作。一个主题topic可以继承其所对应的namespace的属性,因此我们只需对namespace的属性进行设置,就可以 一次性设置该namespace中所有主题topic的属性。

    namespace有两种,分别是本地的namespace和全局的namespace: 
    本地namespace——仅对定义它的集群可见。 
    全局namespace——跨集群可见,可以是同一个数据中心的集群,也可以是跨地域中心的集群,这依赖于是否在namespace中设置了 跨集群拷贝数据的功能。

    虽然本地namespace和全局namespace的作用域不同,但是只要对他们进行适当的设置,都可以跨团队和跨组织共享。一旦生产者 获得了namespace的写入权限,那么它就可以往namespace中的所有topic主题写入数据,如果某个主题不存在,则在生产者第一次写入 数据时动态创建。

2.Pulsar NameSpace(名称空间) 相关操作_基础操作

1 - 在指定的租户下创建名称空间
cd /export/server/brokers/bin 
./pulsar-admin namespaces create test-tenant/test-namespace
2 - 获取所有的名称空间列表
cd /export/server/brokers/bin 
./pulsar-admin namespaces list test-tenant
3 - 删除名称空间
cd /export/server/brokers/bin 
./pulsar-admin namespaces delete test-tenant/ns1
1 - 获取名称空间相关的配置策略
cd /export/server/brokers/bin 
./pulsar-admin namespaces policies test-tenant/test-namespace
2 - 配置复制集群
2.1- 设置复制集群: 
cd /export/server/brokers/bin 
pulsar-admin namespaces set-clusters test-tenant/ns1 --clusters cl2 

2.2- 获取给定命名空间复制集群的列表 
pulsar-admin namespaces get-clusters test-tenant/ns1
3 - 配置 backlog quota 策略
#待定配额帮助Broker在某个名称空间达到某个阈值限制时限制其带宽/存储。管理员可以设置限制,并在达到限制后采取相应的行动。
3.1- 设置backlog quota 策略 
cd /export/server/brokers/bin 
pulsar-admin namespaces set-backlog-quota --limit 10G --limitTime 36000 --policy producer_request_hold test-tenant/ns1 

--policy 的值选择: 
    producer_request_hold:broker 暂停运行,并不再持久化生产请求负载 
    producer_exception:broker 抛出异常,并与客户端断开连接。 
    consumer_backlog_eviction:broker 丢弃积压消息

3.2- 获取 backlog quota 策略    
pulsar-admin namespaces get-backlog-quotas test-tenant/ns1 {
        "destination_storage": { 
        "limit": 10, 
        "policy": "producer_request_hold" 
    }
}

3.3 - 移除backlog quota 策略 
pulsar-admin namespaces remove-backlog-quota test-tenant/ns1    
4 - 配置持久化策略
#持久化策略可以为给定命名空间下 topic 上的所有消息配置持久等级
4.1- 设置持久化策略 
pulsar-admin namespaces set-persistence --bookkeeper-ack-quorum 2 --bookkeeper-ensemble 3 --bookkeeper-write-quorum 2 -- ml-mark-delete-max-rate 0 test-tenant/ns1 

参数说明: 
Bookkeeper-ack-quorum:每个 entry 在等待的 acks(有保证的副本)数量,默认值:0 
Bookkeeper-ensemble:单个 topic 使用的 bookie 数量,默认值:0 
Bookkeeper-write-quorum:每个 entry 要写入的次数,默认值:0 
Ml-mark-delete-max-rate:标记-删除操作的限制速率(0表示无限制),默认值:0.0

4.2- 获取持久化策略 
pulsar-admin namespaces get-persistence test-tenant/ns1 {
"bookkeeperEnsemble": 3, 
"bookkeeperWriteQuorum": 2, 
"bookkeeperAckQuorum": 2, 
"managedLedgerMaxMarkDeleteRate": 0 
}
5 - 配置消息存活时间(TTL)
#以秒为单位
5.1- 设置消息存活时间 
    pulsar-admin namespaces set-message-ttl --messageTTL 100 test-tenant/ns1
5.2- 获取消息的存活时间 
    pulsar-admin namespaces get-message-ttl test-tenant/ns1
5.3- 删除消息的存活时间 
    pulsar-admin namespaces remove-message-ttl test-tenant/ns1
5 - 配置整个名称空间中Topic的消息发送速率
5.1- 设置Topic的消息发送的速率 
pulsar-admin namespaces set-dispatch-rate test-tenant/ns1 \ 
--msg-dispatch-rate 1000 \ 
--byte-dispatch-rate 1048576 \ 
--dispatch-rate-period 1 

参数说明: 
--msg-dispatch-rate : 每dispatch-rate-period秒钟发送的消息数量 
--byte-dispatch-rate : 每dispatch-rate-period秒钟发送的总字节数 
--dispatch-rate-period : 设置发送的速率, 比如 1 表示 每秒钟 

5.2 获取topic的消息发送速率 

pulsar-admin namespaces get-dispatch-rate test-tenant/ns1 {
"dispatchThrottlingRatePerTopicInMsg" : 1000, 
"dispatchThrottlingRatePerTopicInByte" : 1048576, 
"ratePeriodInSecond" : 1 
}
6 - 配置整个名称空间中Topic的消息接收速率
6.1- 设置Topic的消息接收的速率 
pulsar-admin namespaces set-subscription-dispatch-rate test-tenant/ns1 \ 
--msg-dispatch-rate 1000 \ 
--byte-dispatch-rate 1048576 \ 
--dispatch-rate-period 

参数说明: 
--msg-dispatch-rate : 每dispatch-rate-period秒钟接收的消息数量 
--byte-dispatch-rate : 每dispatch-rate-period秒钟接收的总字节数 
--dispatch-rate-period : 设置接收的速率, 比如 1 表示 每秒钟 

6.2 获取topic的消息接收速率 

pulsar-admin namespaces get-subscription-dispatch-rate test-tenant/ns1 {
"dispatchThrottlingRatePerTopicInMsg" : 1000, 
"dispatchThrottlingRatePerTopicInByte" : 1048576, 
"ratePeriodInSecond" : 1 
}
7 - 配置整个名称空间中Topic的复制集群的速率
7.1- 设置Topic的消息复制集群的速率 
pulsar-admin namespaces set-replicator-dispatch-rate test-tenant/ns1 \ 
--msg-dispatch-rate 1000 \ 
--byte-dispatch-rate 1048576 \ 
--dispatch-rate-period 1 

参数说明: 
--msg-dispatch-rate : 每dispatch-rate-period秒钟复制集群的消息数量 
--byte-dispatch-rate : 每dispatch-rate-period秒钟复制集群的总字节数 
--dispatch-rate-period : 设置复制集群的速率, 比如 1 表示 每秒钟 

7.2 获取topic的消息复制集群的速率 
pulsar-admin namespaces get-replicator-dispatch-rate test-tenant/ns1 {
"dispatchThrottlingRatePerTopicInMsg" : 1000, 
"dispatchThrottlingRatePerTopicInByte" : 1048576, 
"ratePeriodInSecond" : 1 }

6.Pulsar的topic相关操作

什么是Topic

Topic,话题主题的含义, 在一个名称空间下, 可以定义多个Topic 通过Topic进行数据的分类划分, 将不同的类别的消息放置到不同Topic, 消费者也可以从不同Topic中获取到相关的消息, 是一种更细粒度的消息划分操作, 同时在Topic下可以划分为多个分片, 进行分布式的存储 操作, 每个分片下还存在有副本操作, 保证数据不丢失, 当然这些分片副本更多是由bookkeeper来提供支持
Pulsar 提供持久化与非持久化两种topic。持久化topic是消息发布、消费的逻辑端点。 
#持久化topic地址的命名格式如下:
persistent://tenant/namespace/topic

非持久化消息

非持久topic应用在仅消费实时发布消息与不需要持久化保证的应用程序。 通过这种方式,它通过删除持久消息的开销来减少消息 发布延迟。 
#非持久化topic地址的命名格式如下:
non-persistent://tenant/namespace/topic

Pulsar Topic(主题) 相关操作_基础操作

1 - 创建Topic
#方式一: 创建一个没有分区的topic 
bin/pulsar-admin topics create persistent://my-tenant/my-namespace/my-topic 

#方式二: 创建一个有分区的topic 
bin/pulsar-admin topics create-partitioned-topic persistent://my-tenant/my-namespace/my-topic --partitions 4 

注意: 不管是有分区还是没有分区, `创建topic后,如果没有任何操作, 60s后pulsar会认为此topic是不活动的, 会 自动进行删除`, 以避免生成垃圾数据 

相关配置: 
Brokerdeleteinactivetopicsenabenabled : 默认值为true 表示是否启动自动删除 BrokerDeleteInactiveTopicsFrequencySeconds: 默认为60s 表示检测未活动的时间
2 - 列出当前某个名称空间下的所有Topic
./pulsar-admin topics list my-tenant/my-namespace
3 - 更新Topic操作
我们可针对有分区的topic去更新其分区的数量 
./pulsar-admin topics update-partitioned-topic persistent://my-tenant/my-namespace/my-topic --partitions 8
4 - 删除Topic操作
#删除没有分区的topic: 
    bin/pulsar-admin topics delete persistent://my-tenant/my-namespace/my-topic 
#删除有分区的
    topic bin/pulsar-admin topics delete-partitioned-topic persistent://my-tenant/my-namespace/my-topic

Pulsar Topic(主题) 高级操作

1 - 授权
pulsar-admin topics grant-permission --actions produce,consume --role application1 persistent://test- tenant/ns1/tp1
2- 获取权限
pulsar-admin topics grant-permission --actions produce,consume --role application1 persistent://test- tenant/ns1/tp1
3- 取消权限
pulsar-admin topics revoke-permission --role application1 persistent://test-tenant/ns1/tp1 {"application1": [
"consume", "produce" 
]
}

7.Apache Pulsar基于Java Api基本使用

7.1maven

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <parent>
        <artifactId>pulsar_2.8.1_parent</artifactId>
        <groupId>com.itheima</groupId>
        <version>1.0-SNAPSHOT</version>
    </parent>
    <modelVersion>4.0.0</modelVersion>

    <artifactId>pulsar_base</artifactId>

    <repositories><!--代码库-->
        <repository>
            <id>aliyun</id>
            <url>http://maven.aliyun.com/nexus/content/groups/public/</url>
            <releases>
                <enabled>true</enabled>
            </releases>
            <snapshots>
                <enabled>false</enabled>
                <updatePolicy>never</updatePolicy>
            </snapshots>
        </repository>
    </repositories>

    <dependencies>
        <dependency>
            <groupId>org.apache.pulsar</groupId>
            <artifactId>pulsar-client-all</artifactId>
            <version>2.8.1</version>
        </dependency>

        <dependency>
            <groupId>org.apache.pulsar</groupId>
            <artifactId>pulsar-functions-api</artifactId>
            <version>2.8.1</version>
        </dependency>

        <!--Fink 和 Pulsar 集成相关的包-->
        <!--fink 相关的包-->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>1.13.1</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-common</artifactId>
            <version>1.13.1</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_2.11</artifactId>
            <version>1.13.1</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_2.11</artifactId>
            <version>1.13.1</version>
        </dependency>

        <!--Pulsar 与 flink 整合包-->
        <dependency>
            <groupId>io.streamnative.connectors</groupId>
            <artifactId>pulsar-flink-connector_2.11</artifactId>
            <version>1.13.1.5-rc1</version>

            <exclusions>
                <exclusion>
                    <groupId>org.apache.pulsar</groupId>
                    <artifactId>pulsar-client-all</artifactId>
                </exclusion>
            </exclusions>

        </dependency>

        <!--Kafka和Pulsar整合的适配器包-->
        <!--<dependency>
            <groupId>org.apache.pulsar</groupId>
            <artifactId>pulsar-client-kafka</artifactId>
            <version>2.8.0</version>
        </dependency>-->

        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>2.4.1</version>
        </dependency>

        <!--Spark 的Pulsar适配器-->
        <dependency>
            <groupId>org.apache.pulsar</groupId>
            <artifactId>pulsar-spark</artifactId>
            <version>2.8.0</version>

            <exclusions>
                <exclusion>
                    <groupId>org.apache.spark</groupId>
                    <artifactId>spark-streaming_2.10</artifactId>
                </exclusion>
            </exclusions>
        </dependency>

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming_2.11</artifactId>
            <version>2.1.0</version>
        </dependency>
        <dependency>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-annotations</artifactId>
            <version>2.6.1</version>
        </dependency>

        <dependency>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-databind</artifactId>
            <version>2.6.1</version>
        </dependency>

        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-xml</artifactId>
            <version>2.11.0-M4</version>
        </dependency>

        <!--KOP协议-->
        <dependency>
            <groupId>com.rabbitmq</groupId>
            <artifactId>amqp-client</artifactId>
            <version>5.8.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.pulsar</groupId>
            <artifactId>pulsar-client</artifactId>
            <version>2.5.2</version>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.1</version>
                <configuration>
                    <target>1.8</target>
                    <source>1.8</source>
                </configuration>
            </plugin>
        </plugins>
    </build>
</project>

7.2使用JAVA如何管理租户

import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.common.policies.data.TenantInfo;
import java.util.HashSet;
import java.util.List;

/**
 * 使用JAVA API 完成 租户操作
 */
public class _01_createTenants {

    public static void main(String[] args) throws Exception {

        // 1. 创建Pulsar的Admin管理对象
        String serviceHttpUrl = "http://node1:8080,node2:8080,node3:8080";//必须是集群
        PulsarAdmin pulsarAdmin = PulsarAdmin.builder().serviceHttpUrl(serviceHttpUrl).build();

        //2.1: 创建租户
        HashSet<String> allowedClusters = new HashSet<>();
        allowedClusters.add("pulsar-cluster");//vim broker.conf  修改第98行: 修改集群的名称 clusterName=pulsar-cluster
        TenantInfo config = TenantInfo.builder().allowedClusters(allowedClusters).build();
        pulsarAdmin.tenants().createTenant("itcast_pulsar_t",config);

        // 2.2: 查看当前有那些租户
        List<String> tenants = pulsarAdmin.tenants().getTenants();
        for (String tenant : tenants) {
            System.out.println("租户信息为:"+tenant);
        }

        //2.3: 删除租户操作
        pulsarAdmin.tenants().deleteTenant("itcast_pulsar_t");

        //3. 关闭管理对象
        pulsarAdmin.close();
    }
}

7.3名称空间增删改查

import org.apache.pulsar.client.admin.PulsarAdmin;
import java.util.List;

/**
 * 使用JAVA API 完成 名称空间操作
 */
public class _02_createNamespace {

    public static void main(String[] args) throws Exception {

        //1. 创建Pulsar的Admin管理对象  必须是集群才可以
        String serviceHttpUrl = "http://node1:8080,node2:8080,node3:8080";
        PulsarAdmin pulsarAdmin = PulsarAdmin.builder().serviceHttpUrl(serviceHttpUrl).build();
        //2. 执行相关的操作

        //2.1 如何创建名称空间
         pulsarAdmin.namespaces().createNamespace("itcast_pulsar_t/itcast_pulsar_n");

        //2.2 获取在某个租户下, 一共有那些名称空间:
        List<String> namespaces = pulsarAdmin.namespaces().getNamespaces("itcast_pulsar_t");

        for (String namespace : namespaces) {
            System.out.println(namespace);
        }

        //2.3: 删除名称空间
        pulsarAdmin.namespaces().deleteNamespace("itcast_pulsar_t/itcast_pulsar_n");

        //3. 关闭admin对象
        pulsarAdmin.close();
    }
}

7.4TOPIC增删改查

package com.itheima.admin;

import org.apache.pulsar.client.admin.PulsarAdmin;

import java.util.List;

/**
 *  使用JAVA API 完成 Topic相关的操作
 */

public class _03_createTopic {

    public static void main(String[] args) throws Exception {

        //1. 创建Pulsar的Admin管理对象
        String serviceHttpUrl = "http://node1:8080,node2:8080,node3:8080";
        PulsarAdmin pulsarAdmin = PulsarAdmin.builder().serviceHttpUrl(serviceHttpUrl).build();
        //2. 执行相关的操作
        //2.1: 创建 Topic相关的操作: 有分区和没有分区, 以及持久化和非持久化
        //persistent 持久化消息     non-persistent  不持久化消息
        pulsarAdmin.topics().createNonPartitionedTopic("persistent://itcast_pulsar_t/itcast_pulsar_n/t_topic5");
        //pulsarAdmin.topics().createNonPartitionedTopic("non-persistent://itcast_pulsar_t/itcast_pulsar_n/t_topic2");
        // pulsarAdmin.topics().createPartitionedTopic("persistent://itcast_pulsar_t/itcast_pulsar_n/t_topic3",5);
        //pulsarAdmin.topics().createPartitionedTopic("non-persistent://itcast_pulsar_t/itcast_pulsar_n/t_topic5",5);

        //2.2: 查询当前有那些topic:
        //没有分区的topic查询
       /* List<String> topicList = pulsarAdmin.topics().getList("itcast_pulsar_t/itcast_pulsar_n");
        for (String topic : topicList) {
            System.out.println(topic);
        }*/

        //有分区的topic查询
        List<String> topicList = pulsarAdmin.topics().getPartitionedTopicList("itcast_pulsar_t/itcast_pulsar_n");
        for (String topic : topicList) {
            System.out.println(topic);
        }

        //2.3 修改Topic 分片的数量
        pulsarAdmin.topics().updatePartitionedTopic("persistent://itcast_pulsar_t/itcast_pulsar_n/t_topic3",7);

        //2.4 一共有多少个分片呢
        int partitions = pulsarAdmin.topics().getPartitionedTopicMetadata("persistent://itcast_pulsar_t/itcast_pulsar_n/t_topic3").partitions;
        System.out.println(partitions);

        //2.5: 删除Topic
        pulsarAdmin.topics().deletePartitionedTopic("persistent://itcast_pulsar_t/itcast_pulsar_n/t_topic3");

        //3. 关闭admin对象
        pulsarAdmin.close();
    }
}

7.5生产数据_同步方式

7.5.1同步发送(string消息)

package com.itheima.producer;

import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.Schema;

/**
 * Pulsar 生产者 同步发送
 */
public class PulsarProducerSyncTest {
    public static void main(String[] args) throws Exception {

        //1. 创建Pulsar的客户端对象
        PulsarClient pulsarClient = PulsarClient.builder().serviceUrl("pulsar://node1:6650,node2:6650,node3:6650").build();

        //2. 通过客户端创建生产者的对象
        Producer<String> producer = pulsarClient.newProducer(Schema.STRING) //指定发送string类型
                .topic("persistent://public/default/test_src")
                .create();
        //3. 使用生产者发送数据
        producer.send("hello java API pulsar ...");
        System.out.println("数据生产完成....");

        //4. 释放资源
        producer.close();
        pulsarClient.close();
    }
}

7.5.2schama(实体类消息发送)

import com.itheima.pojo.User;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.impl.schema.AvroSchema;

/**
 * Pulsar 生产者 schema的方式方案
 */
public class PulsarProducerSchemaTest {
    public static void main(String[] args) throws Exception {

        //1. 创建pulsar的客户端对象
        PulsarClient pulsarClient = PulsarClient.builder().serviceUrl("pulsar://node1:6650,node2:6650,node3:6650").build();

        //2. 基于客户端对象进行构建生产者对象
        Producer<User> producer = pulsarClient.newProducer(AvroSchema.of(User.class))
                .topic("persistent://itcast_pulsar_t/itcast_pulsar_n/my_tt03")
                .create();

        //3. 进行数据生产
        User user = new User();
        user.setName("张三");
        user.setAge("20");
        user.setAddress("北京");
        producer.send(user);

        //4. 释放资源
        producer.close();
        pulsarClient.close();
    }
}

7.5.3异步发送

import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.Schema;

/**
 * Pulsar 生产者 异步发送
 */
public class PulsarProducerAsyncTest {
    public static void main(String[] args) throws Exception {

        //1. 创建Pulsar的客户端对象
        PulsarClient pulsarClient = PulsarClient.builder().serviceUrl("pulsar://node1:6650,node2:6650,node3:6650").build();

        //2. 通过客户端构建生产者的对象
        Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
                .topic("persistent://itcast_pulsar_t/itcast_pulsar_n/t_topic1")
                .create();
        //3. 进行数据发送操作
        // 发现数据并没有生产成功, 主要原因是
        //          因为采用异步的发送方案, 这种发送方案会先将数据写入到客户端缓存中, 当缓存中数据达到一批后 才会进行发送操作
        producer.sendAsync("hello async pulsar...2222");
        System.out.println("数据生产成功....");

        // 可以发送完成后, 让程序等待一下, 让其将缓冲区中数据刷新到pulsar上 然后在结束
        Thread.sleep(1000);
        //4. 释放资源
        producer.close();
        pulsarClient.close();
    }
}

7.6基于Pulsar实现数据消费

7.6.1消费者

package com.itheima.consumer;

import org.apache.pulsar.client.api.*;

/**
 *  Pulsar的消费者的使用
 */

public class PulsarConsumerTest {

    public static void main(String[] args)  throws Exception{

        //1. 创建pulsar的客户端的对象
        PulsarClient pulsarClient = PulsarClient.builder().serviceUrl("pulsar://node1:6650,node2:6650,node3:6650").build();

        //2. 基于客户端构建消费者对象
        Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
                .topic("txn_t4")
                .subscriptionName("sub_04")//消费名称,随便写
                .subscriptionType(SubscriptionType.Exclusive)
                .subscribe();

        //3. 循环从消费者读取数据

        while(true) {
            //3.1: 接收消息
            Message<String> message = consumer.receive();

            //3.2: 获取消息
            String msg = message.getValue();

            //3.3: 处理数据--- 业务操作
            System.out.println("消息数据为:"+msg);

            //3.4: ack确认操作 不加可能会出现消费重复问题
            consumer.acknowledge(message);

            // 如果消费失败了, 可以采用try catch方式进行捕获异常, 捕获后, 可以进行告知没有消费
            consumer.negativeAcknowledge(message);
        }
    }
}

7.6.2schema模式消费者

import com.itheima.pojo.User;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.impl.schema.AvroSchema;

/**
*Pulsar的消费者的使用  基于schema形式
*/
public class PulsarConsumerSchemaTest {

    public static void main(String[] args) throws Exception {
        //1. 创建Pulsar的客户端对象
        PulsarClient pulsarClient = PulsarClient.builder().serviceUrl("pulsar://node1:6650,node2:6650,node3:6650").build();

        //2. 基于客户端对象构建消费者对象

        Consumer<User> consumer = pulsarClient.newConsumer(AvroSchema.of(User.class))
                .topic("persistent://itcast_pulsar_t/itcast_pulsar_n/my_tt04")
                .subscriptionName("sub_05")
                .subscribe();
        //3. 循环读取数据操作

        while(true){
            //3.1: 接收消息
            Message<User> message = consumer.receive();

            //3.2: 获取消息数据
            User msg = message.getValue();
            System.out.println(msg);

        }
    }
}

7.6.3批量消费

import org.apache.pulsar.client.api.*;
import java.util.concurrent.TimeUnit;

/**
 * Pulsar的消费者的使用  批量消费
 */

public class PulsarConsumerBatchTest {

    public static void main(String[] args) throws Exception {

        //1. 构建Pulsar的客户端对象
        PulsarClient pulsarClient = PulsarClient.builder().serviceUrl("pulsar://node1:6650,node2:6650,node3:6650").build();

        //2. 通过客户端构建消费者对象

        Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
                .topic("persistent://itcast_pulsar_t/itcast_pulsar_n/t_topic1")
                .subscriptionName("sub_04")
                // 设置支持批量读取参数配置
                .batchReceivePolicy(
                        BatchReceivePolicy.builder()
                                .maxNumBytes(1024 * 1024)//允许每一条数据最大的字节大小
                                .maxNumMessages(100) //一次性最多多少条数据
                                .timeout(2000, TimeUnit.MILLISECONDS) //等待时间
                                .build()
                )
                .subscribe();

        //3. 循环读取数据
        while (true) {

            //3.1 读取消息(批量)
            Messages<String> messages = consumer.batchReceive();

            //3.2: 获取消息数据
            for (Message<String> message : messages) {
                String msg = message.getValue();

                System.out.println("消息数据为:" + msg);

                //3.3 ack确认
                consumer.acknowledge(message);
            }
        }
    }
}
版权声明:原创作品,允许转载,转载时务必以超链接的形式表明出处和作者信息。否则将追究法律责任。来自海汼部落-123456789987654321,http://hainiubl.com/topics/75883
回复数量: 0
    暂无评论~~
    • 请注意单词拼写,以及中英文排版,参考此页
    • 支持 Markdown 格式, **粗体**、~~删除线~~、`单行代码`, 更多语法请见这里 Markdown 语法
    • 支持表情,可用Emoji的自动补全, 在输入的时候只需要 ":" 就可以自动提示了 :metal: :point_right: 表情列表 :star: :sparkles:
    • 上传图片, 支持拖拽和剪切板黏贴上传, 格式限制 - jpg, png, gif,教程
    • 发布框支持本地存储功能,会在内容变更时保存,「提交」按钮点击时清空
    Ctrl+Enter