大大毛 的笔记

  DDM's Note

哪怕没有办法一定有说法,
就算没有鸽子一定有乌鸦,
固执无罪 梦想有价,
让他们惊讶.

posts - 14, comments - 23, trackbacks - 0, articles - 58
   :: 首页 ::  :: 联系 ::  :: 管理

Kafka资料落地至MariaDB (带Key的新增、修改)

Posted on 2019-04-11 14:14 大大毛 阅读(429) 评论(0)  编辑  收藏 所属分类: Nifi
需求:
    接收Kafka资料,资料具有Key列(多列),有新增、修改但无删除,需要同步落地至MariaDB
解决方案(仅新增、修改):
    这个场景是最常见的,资料不会有被删除的状态,所有的更新就只有Insert Or Update这两种状态,先上实例的图 (两边的LogMessage是为了接收Fail,有感叹号是避免一起开启的时候它也被开启----这样failure的讯息就不会再卡在Connection中了)


思路:
    因为记录只有新增和修改两种状态,理论上说这两种的SQL非常接近,所以可以做以下考量
    1. Processor层面是否支援Update Or Insert
      > 查网上讯息有个叫Upsert,不过在Nifi中查找,只有一个支援Mongo的组件具有这个功能
    2. DB层面是否支援
      > Maria DB有个 "REPLACE INTO" 的语法是可以支持Insert Or Update,虽然简单看了下介绍说是会依主键或唯一索引去先做定位,如果定位到已经存在则先做删除再进行新增(伪Update),但确实可以达成我们的目的,不是吗?

Processor及其设定:
    ConsumeKafkaRecord,作用是从Kafka中Consume出资料(以Record的形态),这里使用Record是因为源数据就是以Record的方式存上去的 (Avro Schema)
  • Kafka Brokers:Kafka的Broker列表,多个Broker以逗号分隔,类似www.broker1.com:9193,www.broker2.com:9193这样的形式配置
  • Topic Name:需要Consume的Kafka Topic名称
  • Record Reader/Writer:关于Record所需要设定的Reader和Writer,要先行在Configure中设定,当然也要设定好Schema  
  • Group ID:Consumer所要设定的ID,这个的设定要依Kafka的配置来,现在我们一般就只有单个的Partition,所以会要求每个Processor都设定有不同
  • Offset Reset:需要设定为"earliest",这样就会依GroupID没有收过的资料来进行收取,否则就只会收新推上去的资料。第一次玩的兄弟经常坑在GroupID和Offset Reset这两项上,若是收不到资料则有  可能就是GroupID没有换成新的(旧的已经收过一次就不会重新再收),或者是Offset Reset = latest又没有新资料推上去~~~
  • Max Poll Records和SCHEDULING中的Run Schedule:需要根据实际接收的速度来进行调整。经过观察发现Consume的速度超快,但整个Nifi Flow的速度会卡在其它需要做解析或读写DB的Processor外 (通常解析JSON会是前面的关卡),所以任由Consumer的高速读取就会造成整个Nifi流程在后段被卡住。造成这个的主要原因其实就在于kafka处理的高速上,所以当有新换GroupID或新流程时,Kafka上积累的海量资料就会在一瞬间被接收下来,然后就是各种红 (其实红了也没事,它会自动向上推,让前一个Processor停止处理)。
    • 若是常态下的资料推送量就已经超过了你的Nifi处理速度,那么就要考量使用多个线程处理或者是从源头的Kafka上就把资料分割开来  
    • SCHEDULING的Cocurrent Tasks:这个Default=1,就是当前Processor需要开起来的线程数。但是这个设置需要当心,你需要仔细考量过你的资料流是否允许乱序 (多线程时当然不可能还能保证资料处理的顺序),所以它是仅适用于不Care资料处理顺序的场景,例如每笔Key就只会有一笔资料,而且哪笔资料先收后收无所谓

    SplitJson,作用就只是简单的把一个JSON数组切开成单个的JSON。Consume出来的会是个数组,这跟你存放进去的单笔讯息是不是数组没什么关系。

    Connection,就是Processor中的那根带箭头的连线,它的作用是连接不同的Processor并且它还具有缓存池的的一个用途,除了把数据从A导流向B外,还可以将B暂时处理不动的资料存放在自带的缓存池中,若是缓存池达到上限,则Nifi会自动让A暂停处理直至B缓过劲~~~
  • Back Pressure Object Threshold / Back Pressure Data Size Threshold:最大缓存的消息笔数 / 最大缓存消息的体积,两者任一超过就会让上游Processor处理暂停
  • Available Prioritizers:出入缓存池的顺序控制,Default是空,通常来说都应该要设成FIFO先进先出的方式
    • 不设定这个经常会造成Nifi资料处理丢失的假象,A1,A2,A3,A4,最后看到的不是A4而是A3,会让人以为A4被玩掉了,其实只是A4被先处理,而A3变成了最后一笔状态。而且这种错误很难被发现!!


    EvaluateJsonPath1,这个元件的作用是解析JSON,它也只能简单的解析,想在Value中对取出来的值做一些处理好象是不允许的....
  • Destination:表示解析出来的内容是成为Attribute,还是直接替换Flow File内容,这里设定是做为属性,所以Processor处理后就可以在Flow File上看到多出自定义的那些属性以及它们的值
  • Return Type:返回值的类型,这种简单从JSON中取值的可以使用Auto-detect即可
  • Path Not Found Behavior:是说如果设定需要解析的JSON路径不存在时的处理行为
  • Null Value Representation:这个对于Null值的处理, "empty string"会将null设为空字符串(MO=),另外一个"the string 'null'"则是会将null设为"null"这样的字符串 (MO="null")
  • MO/MODELFAMILY/....:这些是我手工添加的属性名称,需要根据JSON长样来设,对应Value设定的$.MO则是表示MO的值来源于JSON第一层的"MO"节点。
    • 需要注意的一点是属性名称貌似是会区分大小写的,所以可以看到我全部使用的大写
  • 截图是运行时态的Procssor,停止运行时PROPERTIES上会有一个 + 号,点它即可以新增自己的属性
    • 有一点比较奇怪的地方,就是通过+号维护进去的多个属性,它们的排列顺序却不是你手工新增的顺序,这点引发另外一处的一个疑问,会在下面讲

    EvaluateJsonPath2,当然也是要从JSON中解析,只不过我是要把整个JSON的内容都保留下来,由于它们要求的设定不同,所以被迫要撕成两个元件来做
  • Destination:这个设定仍然是属性
  • Return Type:json,第一个解析元件虽然可以随意设置,但把这两种合并成一个元件并使用Auto时就会报错,所以看起来第一种简单属性实际上只支持Scalar吧...
  • JSONDATA:我定义的一个属性名称,注意Value中设定的"@"符号,它表示整份FlowFile的内容(前面已经转成一个JSON)
  • 这个JSONDATA是因为我的需求,因为Kafka上的资料来源于其它系统,而我其实只需要其中的少量几个栏位 (前一个EvaluateJsonPath解析的那些),为了备查数据上的其它栏位以及在后续使用,所以才要把整份JSON都保留到DB中去 (说得这么高端,实际的原因却是他们的JSON属性是用程序硬拼字串拼出来的,有的东西实在是在Nifi中搞不出来......)

    UpdateAttribute,元件用途是对FlowFile的Attrubute进行修改,这里是拿来对解析出来的值进行再加工以及添加新属性
  • Delete Attributes Expression:这个属性如果有设置就表示该Processor为Delete属性的状态,会忽略你新加的那些属性处理,只专心做好一件事"删除符合条件的属性"
  • PROVIDER:这是一个新的属性,它并没有包含在JSON中,是为表示数据来源而新加的
  • SO:这个就是前面
    EvaluateJsonPath1解析出来的某个值,那个元件无法直接加工,所以放在这里做的二次加工,去掉前导0

    AttributesToJson,作用是将一堆Attribute转换为Json,当然就只能是那种简单结构的Json,这里使用它是为了配合后面一个Processor的使用
  • Attributes List:拿来生成JSON的属性列表,这里我其实把EvaluateJsonPath1、EvaluateJsonPath2和UpdateAttribute产生的属性都放上去了 (它们就是我落地MariaDB的Table列)
    • 不得不说的一个灰常遗憾的结果:那就是生成的JSON属性顺序绝对不是你在List中写的属性顺序,我比较怀疑是在前面几个组件生成Attribute的顺序,但更让人遗憾的是它们的顺序也不会是你维护它们的顺序。这个结果会导致我们在另外的Case 2中会碰到一个不可逾越的障碍~~~~
  • Attributes Regular Expression:符合条件的正则表达式
  • Destination: 这个属性在 
    EvaluateJsonPath上
    就有, 它可以让结果成为一个新的属性还是直接替换FlowFile的内容, Default是直接换掉FlowFile的内容。

    ConvertJsonToSQL,作用是根据JSON内容转换成SQL语句以及语句所要的参数,经过这一关后FlowFile的内容就变成SQL语句,然后Attribute中多出一些参数

  • JDBC Connection Pool:Configrue中指定的MariaDB连接字符串,那里直接有指定Schema
  • Statement Type:这个有INSERT、UPDATE、DELETE这3个选项,若是Mongo的那个组件就会看到有UPSERT(Update or Insert),其它各类的都木有~~~,这里我使用的是INSERT选项,后面通过玩的一点小花招把它再折腾为REPLACE INTO
  • Update Keys: 这个属性可以不填,它是For Update时使用的
  • SQL Parameter Attribute Prefix: default = sql,它其实影响到组件处理后生成的SQL语句参数叫什么,设为sql,最后就会看到生成出来
    。如下图就是处理后的Attribute样式,它会产生sql.args.X.type和sql.args.X.value,这一组合起来就对应于SQL中第一个?参数的类型及值,”sql"就是我们这里设置的前缀名称 (充分考虑到大家会想要搞事)

    ReplaceText,作用是文本替换,这里就是我们处理Update Or Insert的关键,直接把SQL语句换掉它
  • Search Value:在FlowFile中查找的字符串,它支持正则
  • Replacement Value:替换的值,这里就是简单的把Insert Into (x1,x2,x3) values (?,?,?)处理为Replace Into (x1,x2,x3) values (?,?,?)而已,Replace Or Insert的行为交给DB去做

    PutSQL,作用是在DB上执行一段SQL语句
  • JDBC Connection Pool:前面ConvertJsonToSQL转换SQL时就有用过,指定数据库的连接
  • SQL Statement:需执行的SQL,为空时表示使用前面传递过来的FlowFile的内容(已经是一个SQL语句)


总结:
    这是一个带有Key值(多个Key列)的无删除行为的资料接收,所以可以利用AttributeToJSON去将提取出来的有用属性重新生成JSON文件,并直接利用ConvertJsonToSQL转换为Insert语句及对应的绑定参数,这里借用了MariaDB提供的Replace Into机制去自动使用表上的Key键去做Update更新,所以整个Nifi Flow还是比较简单。在后续文章中会讲到带Delete行为的资料接收方法以及无Key更新的解决方案

只有注册用户登录后才能发表评论。


网站导航:
 

i am ddm