大大毛 的笔记

  DDM's Note

哪怕没有办法一定有说法,
就算没有鸽子一定有乌鸦,
固执无罪 梦想有价,
让他们惊讶.

posts - 14, comments - 23, trackbacks - 0, articles - 58
   :: 首页 ::  :: 联系 ::  :: 管理

Oracle资料推送MQTT

Posted on 2019-04-10 15:25 大大毛 阅读(373) 评论(0)  编辑  收藏 所属分类: Nifi
需求
将资料从Oracle推至MQTT,资料结果使用JSON格式
场景1:直接推送
场景2:仅当资料有变更时才推送

解决方案
QueryDatabaseTable --> ConvertAvroToJSON --> PublishMQTT


Processor及其设定:
QueryDatabaseTable作用是从DB中捞取资料,可以想象Nifi把它转成一个Select语句在执行


  • Database Connection Pooling Service:在Configure中设定的数据库连接,可以在Processor Group中被共用
  • Database Type:这个设定的是Oracle。其实它与数据库连接设定有点重叠,那边已经有指定是哪一种类型的DB,这里需要再指定,我想会不会是利用它来生成不同的Select查询语法?
  • Table Name:表名,我这里使用的是View名称,View其实就已经对Table做出一些限定,可以挑选列及设定查询条件
  • Maximum-value Columns:这个属性很重要,如果不设定则Nifi在查询的时候会捞取所有的资料,如果有设定某个列,则Nifi仅会捞取“新”资料
    • 例如图上设定“BATCHID”这个列。第一次Nifi启动时捞取资料中最大BATCHID = 10,则下一次Nifi再次启动时就只会捞取BATCHID>10的资料,并且会自动记录下已经捞过的最大值
    • 最大值保存在下面这里,Processor上右键选择“View State”


    • 下图可以看到当前最新的值,点“Clear State”则可以将保留值清空(Nifi下次启动时就会捞取所有资料)

  • 对于仅需要简单拉取资料的场景1来说,“Maximum-value Columns”置空即可;而对于仅在资料有更新时才要拉取的场景2来说,则需要设定并且在View中做出一些调整才可以达成
    • 当有资料更新时,则被更新资料的BatchID会是更新的值,所以只要在View中虚拟BatchID列 = Max(BatchID),就可以达成有资料更新才要拉取的效果
ConvertAvroToJSON,将Avro类型资料转换为JSON,这个可以不用改设定

PublishMQTT,将资料Publish到MQTT指定Topic
  • Broker URIMQTT的Broker地址
  • Client ID发布MQTT的Client端ID(注意不要多个Processor使用相同的Client ID,这样Processor容易被卡死)
  • Topic发布至MQTT的Topic名称
  • Retain MessageMQTT的遗言属性,即是否保留推送的消息(若设为false,则仅有当前连上MQTT的客户端才能收到这笔消息)


只有注册用户登录后才能发表评论。


网站导航:
 

i am ddm