Pulsar Function 例子
admin
2023-01-23 10:01:14
0

在单机环境下实现字符串追加函数(Pulsar 2.4.2版本)

1 启动单机Pulsar

     $ bin/pulsar-daemon start standalone

2 创建函数

1) 准备环境

    项目引用 compile 'org.apache.pulsar:pulsar-functions-api:2.4.2'

2) 创建JAVA函数(此函数用于数据源来的topic schema是string,输出的tiopic schema是string)

     Pulsar Function 例子

     导出jar包,放到pulsar服务器目录下,本例子放在 /data/jar/下

3)使用命令行工具加载函数到Pulsar,                     

   bin/pulsar-admin functions create \

   --classname test.AppStrFunction \

   --jar /data/jar/pf.jar \

   --inputs persistent://public/default/tlstest \

   --output persistent://public/default/teststr \

   --tenant public \

   --namespace default \

   --name appStrFunction

   参数说明:

                     

参数
说明
functions通知 pulsar broker,函数操作
create创建函数,默认创建成功后启动
classname函数类名称,需要加上包名
jar指定 jar 包的运行路径
inputs指定 函数 数据的来源在哪里,支持多个 topics 作为输入
output如果该 函数 有输出(有些情况下,function 没有输出),指定 function 输出的 topic,只能有一个输出
tenant指定该 函数 运行的租户名
namespace指定该 函数 运行的命名空间
name指定该 函数 运行的名称
以下是函数相关其他操作

停止函数

bin/pulsar-admin functions stop \

--tenant public \

--namespace default \

--name appStrFunction

启动函数

bin/pulsar-admin functions start \

--tenant public \

--namespace default \

--name appStrFunction

删除函数

bin/pulsar-admin functions delete \

--tenant public \

--namespace default \

--name appStrFunction

函数的日志在 pulsar安装目录 /logs/functions下

3 测试函数

   根据前边函数已成功加载启动

1)向tlstest主题发送消息   

import java.util.concurrent.TimeUnit;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.Schema;
public class SendMsgTest{
  public static void main(String[] args){
      String url="pulsar://192.168.1.48:6650";
  try{
     PulsarClient client =PulsarClient.builder()
           .serviceUrl(url)
           .connectionTimeout(10,TimeUnit.SECONDS)
           .build();
     Producer producer=client.newProducer(Schema.STRING)
           .topic("tlstest")
           .sendTimeout(10,TimeUnit.SECONDS)
           .producerName("senduser")
           .create();
           producer.send("this is a book");
           System.out.print("send ok");
           client.close();
      }catch(Exception e){
        e.printStackTrace();
      }
  }
}

2)读取teststr主题消息

   

import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.impl.schema.JSONSchema;
import schema.OrderModel;
import com.alibaba.fastjson.JSON;
public class RecFunTest {
public static void main(String[] args) {
String url = "http://192.168.1.48:8080";
try{
  PulsarClient client =PulsarClient.builder()
    .serviceUrl(url)
    .build();
 Consumer consumer=client.newConsumer(Schema.STRING)
    .topic("teststr")
    .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
    .subscriptionType(SubscriptionType.Exclusive)//订阅模式  Exclusive(独占,默认模式) Failover(灾备)Shared(共享)
    .subscriptionName("wbq")//订阅者名称
    .subscribe();
 while (true) {
   Message mondmsg = consumer.receive();
   String msg=mondmsg.getValue();
                System.out.println("receive message=:"+msg);
             }
  }catch(Exception e){
     e.printStackTrace();
  }
 }
}


相关内容

热门资讯

重磅消息“花城牌舍.真的有挂吗... 重磅消息“花城牌舍.真的有挂吗?”原来真的有挂您好,花城牌舍这个游戏其实有挂的,确实是有挂的,需要了...
最新引进“兴动茶馆.怎么开挂?... 您好:兴动茶馆这款游戏可以开挂,确实是有挂的,需要了解加客服微信【9784099】很多玩家在这款游戏...
今日重大通报“边锋老友麻将.真... 网上科普关于“边锋老友麻将有没有挂”话题很是火热,小编也是针对边锋老友麻将作*弊开挂的方法以及开挂对...
玩家攻略科普“十三十三水全民比... 玩家攻略科普“十三十三水全民比鸡.辅助器?”太坑了果然有挂您好,十三十三水全民比鸡这个游戏其实有挂的...
终于了解“中至江西麻将.真的有... 您好:中至江西麻将这款游戏可以开挂,确实是有挂的,需要了解加客服微信【9752949】很多玩家在这款...
终于明白“云圈丰城麻将.辅助开... 家人们!今天小编来为大家解答云圈丰城麻将透视挂怎么安装这个问题咨询软件客服徽9752949的挂在哪里...
终于懂了“新皇豪炸金花.有没有... 网上科普关于“新皇豪炸金花有没有挂”话题很是火热,小编也是针对新皇豪炸金花作*弊开挂的方法以及开挂对...
【第一资讯】“西南互娱.可以开... 网上科普关于“西南互娱有没有挂”话题很是火热,小编也是针对西南互娱作*弊开挂的方法以及开挂对应的知识...
【第一财经】“一生棋牌.到底有... 【第一财经】“一生棋牌.到底有挂吗?”外卦神器下载您好,一生棋牌这个游戏其实有挂的,确实是有挂的,需...
玩家最新攻略“皇豪众娱牛牛.开... 有 亲,根据资深记者爆料皇豪众娱牛牛是可以开挂的,确实有挂(咨询软件无需...