当前位置: 首页 > news >正文

java Flink(四十二)Flink的序列化以及TypeInformation介绍(源码分析)

Flink的TypeInformation以及序列化

TypeInformation主要作用是为了在 Flink系统内有效地对数据结构类型进行管理,能够在分布式计算过程中对数据的类型进行管理和推断。同时基于对数据的类型信息管理,Flink内部对数据存储也进行了相应的性能优化。

Flink 数据类型 & TypeInformation信息_flink typeinformation-CSDN博客

每一个具体的数据类型都对应一个TypeInformation的具体实现,每一个TypeInformation都会为对应的具体数据类型提供一个专属的序列化器。通过 Flink的序列化过程图可以看到TypeInformation会提供一个createSerialize()方法,通过这个方法就可以得到该类型进行数据序列化操作与反序化操作的对象TypeSerializer。
Flink 数据序列化_flink的序列化-CSDN博客

可以看出,TypeInformation在flink的序列化中起了很重要的作用

源码分析

Basic类型

Flink建议通过Pojo进行数据传入,如果传入的数据类型不满足Pojo条件或者不是Flink支持的基础类型,那么就会通过Kryo进行序列化,效率较低

创建一个包含给定元素的新数据流。元素都必须是相同的类型

(先看基本类型的数据传入)

ctrl+左键点击进入

TypeExtractor:一种用于对类进行反射分析的实用程序,用于确定转换函数实现的返回类型。

初始化 TypeInformation 根据数据第一个元素进行判断返回类型

 传入第一个元素

 

 ctrl+左键点击红框方法进入

如果用于此类型或超类型,则使用工厂创建类型信息。否则返回null。

检查我们是否可以从元组中提取类型,否则使用该类

同上,如果是Row类型,则进入该代码块 

 显然我们这次的例子是Basic 中的String类型,所以进入 privateGetForClass

从给定的类(如Integer、String[]或POJO)中创建类型信息。

检查是否可以使用工厂生产类型信息

 对象作为泛型类型信息处理,这里返回的 GenericTypeInfo 就是Flink定义的泛型类型的TypeInfo

如果类型满足,则返回泛型

 如果类型为数组,则进入

如果类型为hadoop writable则进入

 

如果是Basic类型的一种,则命中

显然我们的String类型数据命中了Basic

一路返回到 fromElements,typeInfo里的信息包含了选择序列化的类型

 进入fromCollection:从给定的非空集合创建数据流。

 不能有null元素和混合元素

 最终返回的DataStreamSource数据,typeInfo数据被封装到了其中

Pojo类型

 跟Basic类一样,一步一步进入到privateGetForClass

经过一堆判断,都没有命中后走到pojo类判断

 

判断必须是Public类型类

不能全都是static或者transient字段

  循环读取每个field,字段必须有Get、Set方法

 每个字段创建对应的typeInfo添加到pojoFields

 最后返回的DataStream,可以看到Pojo的每个字段以及字段对应的序列化类型都包装进去

相关文章:

  • 探索ChatGPT时代下的下一代信息检索系统:机遇与挑战
  • 系统资源耗尽对服务器的影响
  • Linux 系统日志
  • (一)Linux+Windows下安装ffmpeg
  • docker opensearch arm64 运行失败解决方案
  • 国内ip切换是否合规?
  • 针对ETC系统的OBE-SAM模块设计方案
  • python --- 练习题3
  • AI基础知识(3)--神经网络,支持向量机,贝叶斯分类器
  • S32 Design Studio 中断
  • python日常刷题(一)
  • web蓝桥杯真题:灯的颜色变化
  • Java使用itextpdf往pdf中插入图片
  • Python Windows系统 虚拟环境使用
  • JavaEE 初阶篇-深入了解进程与线程(常见的面试题:进程与线程的区别)
  • JavaScript-如何实现克隆(clone)函数
  • 【腾讯Bugly干货分享】从0到1打造直播 App
  • android 一些 utils
  • Android路由框架AnnoRouter:使用Java接口来定义路由跳转
  • Bytom交易说明(账户管理模式)
  • CSS实用技巧干货
  • DataBase in Android
  • ES2017异步函数现已正式可用
  • ES学习笔记(10)--ES6中的函数和数组补漏
  • golang中接口赋值与方法集
  • JavaWeb(学习笔记二)
  • Java比较器对数组,集合排序
  • Lsb图片隐写
  • Node项目之评分系统(二)- 数据库设计
  • PyCharm搭建GO开发环境(GO语言学习第1课)
  • SQLServer之创建数据库快照
  • SwizzleMethod 黑魔法
  • 测试如何在敏捷团队中工作?
  • 得到一个数组中任意X个元素的所有组合 即C(n,m)
  • 关于Flux,Vuex,Redux的思考
  • 那些年我们用过的显示性能指标
  • 微信端页面使用-webkit-box和绝对定位时,元素上移的问题
  • 原生Ajax
  • 【干货分享】dos命令大全
  • Spring Batch JSON 支持
  • ​LeetCode解法汇总1410. HTML 实体解析器
  • ​ssh免密码登录设置及问题总结
  • #QT(TCP网络编程-服务端)
  • #我与Java虚拟机的故事#连载07:我放弃了对JVM的进一步学习
  • (k8s中)docker netty OOM问题记录
  • (ZT)出版业改革:该死的死,该生的生
  • (安卓)跳转应用市场APP详情页的方式
  • (附源码)SSM环卫人员管理平台 计算机毕设36412
  • (七)微服务分布式云架构spring cloud - common-service 项目构建过程
  • (三)centos7案例实战—vmware虚拟机硬盘挂载与卸载
  • (五)网络优化与超参数选择--九五小庞
  • (原創) 系統分析和系統設計有什麼差別? (OO)
  • (转)Mysql的优化设置
  • *2 echo、printf、mkdir命令的应用
  • .\OBJ\test1.axf: Error: L6230W: Ignoring --entry command. Cannot find argumen 'Reset_Handler'