Skip to content

liujiawinds/flink-cep-perf

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

11 Commits
 
 
 
 
 
 
 
 
 
 

Repository files navigation

#Pattern 性能测试

测试环境

 flink 版本:1.4.2
 cpu:Intel(R) Core(TM) i7-4790K CPU @ 4.00GHz
 内存:16G (分配给flink的)

测试背景

控制pattern的输出为1%

典型pattern测试

n分钟m次构造测试程序

Q:怎么控制pattern的输出为1%?

A:先确定pattern 的次数设置阈值是多少,然后,阈值 * 100 * 事件发送间隔时间(5s) 即为异常事件插入时间间隔。即,每发送100个事件后连续插入异常事件,插入异常事件个数大于设定的阈值。需要注意的是,设定事件次数的阈值 * 事件发送间隔时间需要小于等于窗口时间,否则pattern不会触发。

pattern代码

Pattern<JSONObject, JSONObject> pattern = Pattern.<JSONObject>begin("start")
                .where(new SimpleCondition<JSONObject>() {
                    @Override
                    public boolean filter(JSONObject value) throws Exception {
                        return value.getString("user").equals("kevin");
                    }
                })
                .timesOrMore(10).greedy()
                .within(Time.minutes(20));

细分场景

  • 满足simple condition的数据很少

测试代码

 速度极快,在一个并行度的情况下都超过42万eps

job 截图:dashborad Screenshot

  • 满足simple condition的数据很多,但是被次数设定给限制住了(连续产生threshold - 1的异常事件)

测试代码

速度还是很快,在一个并行度11万eps左右

job 截图:dashborad Screenshot

N分钟移动M公里

Q: 异常点构造方式?

A: 每一百个事件插入一个异常位置的登录事件。

pattern 代码

Pattern<JSONObject, JSONObject> pattern = Pattern.<JSONObject>begin("prev")
                .where(new SimpleCondition<JSONObject>() {
                    @Override
                    public boolean filter(JSONObject event) throws Exception {
                        return event.getString("event_type").equals("logon");
                    }
                })
                .followedBy("curr")
                .where(new IterativeCondition<JSONObject>() {
                    @Override
                    public boolean filter(JSONObject currentEvent, Context<JSONObject> ctx) throws Exception {
                        if (!currentEvent.getString("event_type").equals("logon")) {
                            return false;
                        }
                        Iterable<JSONObject> iterator = ctx.getEventsForPattern("prev");
                        JSONObject previousEvent = null;
                        for (JSONObject jsonObject : iterator) {
                            previousEvent = jsonObject;
                        }
                        return CEPUtil.geoDistance(previousEvent.getString("geo"), currentEvent.getString("geo")) > 100;
                    }
                })
                .within(Time.minutes(10));

细分场景

  • 时间范围内的数据很少,绝大多数的事件都被window time给过滤掉了。

测试代码

性能情况不错,差不多26w eps

job 截图:dashborad Screenshot

  • 时间范围内的数据很多(每个窗口内包含100条数据),但是距离不足100km

测试代码

性能情况很差了,3.5k eps

job 截图:dashborad Screenshot

使用jvsisualvm查看cpu 抽样耗时70%消耗在 geo运算上面,因为窗口内的数据会两两进行位置运算,所以每一条数据到来都会进行100次的geo运算。

  • 时间范围内的数据减少至10个,每10个事件插入一个异常登录点的事件

测试代码

性能情况变好,接近9w eps

job 截图:dashborad Screenshot

再看jvisualvm 查看cpu 抽样耗时64在fastjson 的jsonobject hashcode方法上了(CEP的SharedBuffer在查询数据的时候会调用事件的hashCode方法)。改用自己写的实体类替换,性能提升2倍。(对比测试代码为:改前改后)这个留待以后需要的时候进行优化。 dashborad Screenshot

JSONObject:

dashborad Screenshot

自己写的Event实体类:

dashborad Screenshot

About

测试带窗口pattern性能

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published

Languages