当前位置: 首页 > news >正文

#调用传感器数据_Flink使用函数之监控传感器温度上升提醒

使用函数ProcessFunction介绍

Flink的ProcessFunction 函数是低阶流处理算子,可以访问流应用程序所有(非循环)基本构建块:

  • 事件 (数据流元素)

  • 状态 (容错和一致性)

  • 定时器 (事件时间和处理时间)

DataStream API提供了ProcessFunction转换算子,可以访问时间戳、注册定时事件。输出特定的一些事件。

ProcessFunction 是一种提供对 KeyedState 和定时器访问的 FlatMapFunction。每在输入流中接收到一个事件,就会调用来此函数来处理。对于容错的状态,ProcessFunction 可以通过 RuntimeContext 访问 KeyedState。

Timers 定时器可以对处理时间和事件时间的变化做一些处理。每次调用 processElement() 都可以获得一个 Context 对象,通过该对象可以访问元素的事件时间戳以及 TimerService。TimerService 可以为尚未发生的事件时间/处理时间实注册回调。当定时器到达某个时刻时,会调用 onTimer() 方法。在调用期间,所有状态再次限定为定时器创建的键,允许定时器操作 KeyedState。

ProcessFunction使用

模拟案例场景:传感器状态信息(这里举例:温度)会隔几秒上传,如果这次上传的温度

比上一次上传的温度高,温度10秒以内都是比较高,就进行持续的提示。

1、这里传感器状态消息模拟从socket接收,消息格式包含二个字段分别是:传感器标识id,传感器温度。

2、接收消息之后对数据流按 传感器标识Id 进行分组,对于温度比上一次高的消息,设置定时器10秒以内如果都是对比上一次温度高,则定时进行提示,如果10秒内对比上一次温度低,则认为温度没有升高,则删除定时器,取消提示。

传感器消息类SensorReading,代码如下:

public class SensorReading {    // 传感器 id    public String sensorId;    // 时间戳    public String timeStamp;    // 温度    public Double temperature;    // 状态描述    public  String lowOrhigt;    // 状态标识    public String status;    public SensorReading() {}    public  SensorReading(String sensorId,String timeStamp,Double temperature){        this.sensorId = sensorId;        this.timeStamp = timeStamp;        this.temperature = temperature;    }    public  SensorReading(String sensorId,Double temperature,String status){        this.sensorId = sensorId;        this.temperature = temperature;        this.status=status;    }    public  SensorReading(String sensorId,Double temperature){        this.sensorId = sensorId;        this.temperature = temperature;    }    @Override    public String toString() {        return "SensorReading{" +                "sensorId='" + sensorId + '\'' +                ", timeStamp=" + timeStamp +                ", temperature=" + temperature +                ", lowOrhigt=" + lowOrhigt +                ", status=" + status +                '}';    }}

自定义TempIncreKeyedProcessFunction类继承自KeyedProcessFunction,代码如下:

可仔细阅读代码注释

/** * @author: Created By yanshien * @company ChinaUnicom Software ysn * @date: 2020-12-24 15:13 * @version: v1.0 * @description: 监控传感器温度是否上升 **/public class TempIncreKeyedProcessFunction extends KeyedProcessFunction<String, SensorReading, String> {    ValueState lastTempState;  // 上一次温度值    ValueState timerTsState;    //  注册定时器的时间戳    @Override    public void open(Configuration parameters) throws Exception {        super.open(parameters);        lastTempState=getRuntimeContext().getState(new ValueStateDescriptor<>("last-temp", Double.class));        timerTsState=getRuntimeContext().getState(new ValueStateDescriptor<>("timer-ts", Long.class));    }    @Override    public void processElement(SensorReading sensorReading, Context context, Collector collector) throws Exception {        // 先取出上一次状态        Double lastTemp = lastTempState.value();        Long timerTs = timerTsState.value();        // 更新温度值        lastTempState.update(sensorReading.temperature);        // 当前温度值和上次温度进行比较,如果温度上升,且没有定时器,注册当前时间10s后进行提醒        // System.out.println( sensorReading.temperature);        if (lastTemp == null) {            lastTemp = sensorReading.temperature;        }        if (timerTs == null) {            timerTs = 0L;        }        if( sensorReading.temperature > lastTemp && timerTs == 0 ){            // 注册当前时间10s后的定时器            Long ts = context.timerService().currentProcessingTime()  +10000L;            context.timerService().registerProcessingTimeTimer(ts);            timerTsState.update(ts);        // 如果温度下降,那么删除定时器        } else if( sensorReading.temperature < lastTemp ){            context.timerService().deleteProcessingTimeTimer(timerTs);            timerTsState.clear();        }    }    @Override    public void onTimer(long timestamp, OnTimerContext ctx, Collector out) throws Exception {        out.collect("传感器" + ctx.getCurrentKey() + "的温度连续" + 10000L/1000 + "秒连续上升");        timerTsState.clear();    }}

主函数代码如下:

/** * @author: Created By yanshien * @company ChinaUnicom Software ysn * @date: 2020-12-24 15:13 * @version: v1.0 * @description: 监控传感器温度是否上升 **/public class ProcessFunctionDemo {    public static void main(String[] args) throws Exception {        // 创建流处理的执行环境        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();        /*设置使用EventTime作为Flink的时间处理标准,不指定默认是ProcessTime*/        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);        // 这里为了便于理解,设置并行度为1,默认并行度是当前机器的cpu数量        env.setParallelism(1);        // 指定数据源 从socket的9000端口接收数据        // 在 cmd 打开 nc -L -p 9000         DataStream inputStream = env.socketTextStream("localhost", 9000);        DataStream sourceDS = inputStream.filter(new FilterFunction() {                    @Override                    public boolean filter(String line) throws Exception {                        if (null == line || "".equals(line)) {                            return false;                        }                        String[] lines = line.split(",");                        if (lines.length != 2) {                            return false;                        }                        return true;                    }                });        // map转换,将数据转换成SensorReading格式,第一个字段代表是传感器id,第二个字段的代表的是传感器温度        DataStream warningDS = sourceDS.map(new MapFunction() {            @Override            public SensorReading map(String line) throws Exception {                String[] lines = line.split(",");                return new SensorReading(lines[0], Double.valueOf(lines[1]));            }        }).keyBy(new KeySelector() {            @Override            public String getKey(SensorReading value) throws Exception {                return value.sensorId;            }        }).process(new TempIncreKeyedProcessFunction());        // 打印提醒信息        warningDS.print();        env.execute();    }

程序测试

在 cmd 打开执行命令:nc -L -p 9000,然后运行程序,从socket端发送数据如下:

sensor_1,29.7sensor_1,30.9sensor_1,32

测试10秒内发送温度比前一次高,则进行提醒输出,如下图所示:

401ac0dfd59098fc9e4451c9a32c7573.png

如果觉得文章能帮到您,欢迎关注、转发。

相关文章:

  • 内存使用率_微软深度优化Edge:跑分提高13% 内存占用下降27% 体积缩小近50%
  • 程序员转实施工程师_实施工程师到底做什么的?我认为比程序员接触面更广
  • bocketmq 多个消费者同时_RocketMQ 实战之快速入门
  • linux 安装mysql_linux下在线安装mysql(完整版)
  • mysql insert into 时间_关于MySQL的insert添加自动获取日期的now()的用法
  • mysql 存储模板文件_mysql 模板 存储
  • cyq.data 连接mysql_CYQ.Data V5 文本数据库支持SQL语句操作(实现原理解说)-阿里云开发者社区...
  • mysql组复制脑裂_MySQL 组复制介绍
  • 前端websocket更新列表数据_详解前端websocket服务器之数据传输协议
  • stopwords怎么用_用Python画词云图,展示“新冠肺炎”关键词
  • pygame简单小游戏代码_用pygame实现一个简单的垃圾分类小游戏(已获校级二等奖)!...
  • mysql数据迁移到redis_Mysql到Redis的数据迁移方法
  • linux mysql 客户端 安装配置_linux下mysql的安装部署
  • mysql虚拟表的创建_mysql虚拟表
  • win 10 查看mysql密码_win10系统登录mysql时报错1045?查看解决方案
  • 【跃迁之路】【641天】程序员高效学习方法论探索系列(实验阶段398-2018.11.14)...
  • AngularJS指令开发(1)——参数详解
  • - C#编程大幅提高OUTLOOK的邮件搜索能力!
  • co模块的前端实现
  • crontab执行失败的多种原因
  • ES10 特性的完整指南
  • express + mock 让前后台并行开发
  • golang中接口赋值与方法集
  • Java小白进阶笔记(3)-初级面向对象
  • linux安装openssl、swoole等扩展的具体步骤
  • PV统计优化设计
  • Python 使用 Tornado 框架实现 WebHook 自动部署 Git 项目
  • SpringCloud(第 039 篇)链接Mysql数据库,通过JpaRepository编写数据库访问
  • yii2权限控制rbac之rule详细讲解
  • 计算机常识 - 收藏集 - 掘金
  • 看图轻松理解数据结构与算法系列(基于数组的栈)
  • 前端技术周刊 2019-02-11 Serverless
  • 扩展资源服务器解决oauth2 性能瓶颈
  • ​​快速排序(四)——挖坑法,前后指针法与非递归
  • ​力扣解法汇总946-验证栈序列
  • (done) 两个矩阵 “相似” 是什么意思?
  • (求助)用傲游上csdn博客时标签栏和网址栏一直显示袁萌 的头像
  • (顺序)容器的好伴侣 --- 容器适配器
  • (五) 一起学 Unix 环境高级编程 (APUE) 之 进程环境
  • (幽默漫画)有个程序员老公,是怎样的体验?
  • (正则)提取页面里的img标签
  • (转)visual stdio 书签功能介绍
  • ***原理与防范
  • .NET CORE 2.0发布后没有 VIEWS视图页面文件
  • .net 按比例显示图片的缩略图
  • .NET 中选择合适的文件打开模式(CreateNew, Create, Open, OpenOrCreate, Truncate, Append)
  • .Net6 Api Swagger配置
  • .net中调用windows performance记录性能信息
  • @Bean注解详解
  • @requestBody写与不写的情况
  • [ 代码审计篇 ] 代码审计案例详解(一) SQL注入代码审计案例
  • [ 数据结构 - C++]红黑树RBTree
  • [ 英语 ] 马斯克抱水槽“入主”推特总部中那句 Let that sink in 到底是什么梗?
  • []C/C++读取串口接收到的数据程序
  • [20150629]简单的加密连接.txt