Skip to content

Commit d128bd7

Browse files
committed
[#noissue] Added function names, idleness in flink
1 parent 89fe796 commit d128bd7

File tree

1 file changed

+75
-119
lines changed

1 file changed

+75
-119
lines changed

flink/src/main/java/com/navercorp/pinpoint/flink/StatStreamingVer2Job.java

+75-119
Original file line numberDiff line numberDiff line change
@@ -15,142 +15,98 @@
1515
*/
1616
package com.navercorp.pinpoint.flink;
1717

18-
/**
19-
* @author minwoo.jung
20-
*/
21-
18+
import com.navercorp.pinpoint.common.server.bo.stat.join.JoinApplicationStatBo;
2219
import com.navercorp.pinpoint.common.server.bo.stat.join.JoinStatBo;
23-
import com.navercorp.pinpoint.flink.dao.hbase.StatisticsDao;
24-
import com.navercorp.pinpoint.flink.function.AgentStatTimestampAssigner;
2520
import com.navercorp.pinpoint.flink.function.ApplicationStatBoWindow;
26-
import com.navercorp.pinpoint.flink.function.ApplicationStatBoFilter;
27-
import com.navercorp.pinpoint.flink.function.ApplicationStatKeySelector;
28-
import com.navercorp.pinpoint.flink.receiver.TcpSourceFunction;
29-
import com.navercorp.pinpoint.flink.vo.RawData;
3021
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
3122
import org.apache.flink.api.java.tuple.Tuple3;
3223
import org.apache.flink.api.java.utils.ParameterTool;
33-
import org.apache.flink.streaming.api.datastream.DataStream;
34-
import org.apache.flink.streaming.api.datastream.DataStreamSource;
35-
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
3624
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
3725
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
3826
import org.apache.flink.streaming.api.windowing.time.Time;
39-
import org.apache.logging.log4j.Logger;
4027
import org.apache.logging.log4j.LogManager;
28+
import org.apache.logging.log4j.Logger;
4129

4230
import java.io.Serializable;
31+
import java.time.Duration;
4332

33+
/**
34+
* @author minwoo.jung
35+
* @author youngjin.kim2
36+
*/
4437
public class StatStreamingVer2Job implements Serializable {
45-
private final static Logger logger = LogManager.getLogger(StatStreamingVer2Job.class);
38+
private static final Logger logger = LogManager.getLogger(StatStreamingVer2Job.class);
39+
40+
private static final String JOB_NAME = "Aggregation Stat Data";
4641

4742
public static void main(String[] args) throws Exception {
48-
ParameterTool parameters = ParameterTool.fromArgs(args);
49-
new StatStreamingVer2Job().start(parameters);
43+
final ParameterTool paramTool = ParameterTool.fromArgs(args);
44+
new StatStreamingVer2Job().start(paramTool);
5045
}
5146

52-
public void start(ParameterTool parameters) throws Exception {
53-
logger.info("start Aggregation Stat Data job with job parameter. : " + parameters.toMap() );
54-
final Bootstrap bootstrap = Bootstrap.getInstance(parameters.toMap());
55-
56-
// set data source
57-
final TcpSourceFunction tcpSourceFunction = bootstrap.getTcpSourceFunction();
47+
public void start(ParameterTool paramTool) throws Exception {
48+
logger.info("Starting \"{}\" job with job parameter: {}", JOB_NAME, paramTool.toMap());
49+
final Bootstrap bootstrap = Bootstrap.getInstance(paramTool.toMap());
50+
final FlowParameters flowParams = new FlowParameters(paramTool);
5851
final StreamExecutionEnvironment env = bootstrap.createStreamExecutionEnvironment();
59-
env.getConfig().setGlobalJobParameters(parameters);
60-
DataStreamSource<RawData> rawData = env.addSource(tcpSourceFunction);
61-
62-
//0. generation rawdata
63-
final SingleOutputStreamOperator<Tuple3<String, JoinStatBo, Long>> statOperator = rawData.flatMap(bootstrap.getTbaseFlatMapper());
64-
65-
//1-1 save data processing application stat raw data
66-
final StatisticsDao statisticsDao = bootstrap.getStatisticsDao();
67-
DataStream<Tuple3<String, JoinStatBo, Long>> applicationStatAggregationData = statOperator.filter(new ApplicationStatBoFilter())
68-
.assignTimestampsAndWatermarks(WatermarkStrategy.<Tuple3<String, JoinStatBo, Long>>forMonotonousTimestamps()
69-
.withTimestampAssigner(new AgentStatTimestampAssigner()))
70-
.keyBy(new ApplicationStatKeySelector())
71-
.window(TumblingEventTimeWindows.of(Time.milliseconds(ApplicationStatBoWindow.WINDOW_SIZE)))
72-
.allowedLateness(Time.milliseconds(ApplicationStatBoWindow.ALLOWED_LATENESS))
73-
.apply(new ApplicationStatBoWindow());
74-
applicationStatAggregationData.addSink(statisticsDao);
75-
76-
// 1-2. aggregate application stat data
77-
// statOperator.filter(new FilterFunction<Tuple3<String, JoinStatBo, Long>>() {
78-
// @Override
79-
// public boolean filter(Tuple3<String, JoinStatBo, Long> value) throws Exception {
80-
// if (value.f1 instanceof JoinApplicationStatBo) {
81-
// logger.info("1-2 application stat aggre window function : " + value.f1);
82-
// return true;
83-
// }
84-
//
85-
// return false;
86-
// }
87-
// })
88-
// .assignTimestampsAndWatermarks(new Timestamp())
89-
// .keyBy(0)
90-
// .window(TumblingEventTimeWindows.of(Time.seconds(120)))
91-
// .apply(new WindowFunction<Tuple3<String, JoinStatBo, Long>, Tuple3<String, JoinStatBo, Long>, Tuple, TimeWindow>() {
92-
// @Override
93-
// public void apply(Tuple tuple, TimeWindow window, Iterable<Tuple3<String, JoinStatBo, Long>> values, Collector<Tuple3<String, JoinStatBo, Long>> out) throws Exception {
94-
// try {
95-
// JoinApplicationStatBo joinApplicationStatBo = join(values);
96-
// logger.info("1-2 application stat aggre window function : " + joinApplicationStatBo);
97-
// out.collect(new Tuple3<>(joinApplicationStatBo.getId(), joinApplicationStatBo, joinApplicationStatBo.getTimestamp()));
98-
// } catch (Exception e) {
99-
// logger.error("window function error", e);
100-
// }
101-
// }
102-
//
103-
// private JoinApplicationStatBo join(Iterable<Tuple3<String, JoinStatBo, Long>> values) {
104-
// List<JoinApplicationStatBo> joinApplicaitonStatBoList = new ArrayList<JoinApplicationStatBo>();
105-
// for (Tuple3<String, JoinStatBo, Long> value : values) {
106-
// joinApplicaitonStatBoList.add((JoinApplicationStatBo) value.f1);
107-
// }
108-
// return JoinApplicationStatBo.joinApplicationStatBo(joinApplicaitonStatBoList);
109-
//
110-
// }
111-
// }).writeUsingOutputFormat(statisticsDao);
112-
113-
114-
// 2. agrregage agent stat
115-
// statOperator.filter(new FilterFunction<Tuple3<String, JoinStatBo, Long>>() {
116-
// @Override
117-
// public boolean filter(Tuple3<String, JoinStatBo, Long> value) throws Exception {
118-
// if (value.f1 instanceof JoinAgentStatBo) {
119-
// logger.info("2 application stat aggre window function : " + value.f1);
120-
// return true;
121-
// }
122-
//
123-
// return false;
124-
// }
125-
// })
126-
// .assignTimestampsAndWatermarks(new Timestamp())
127-
// .keyBy(0)
128-
// .window(TumblingEventTimeWindows.of(Time.seconds(120)))
129-
//
130-
// .apply(new WindowFunction<Tuple3<String, JoinStatBo, Long>, Tuple3<String, JoinStatBo, Long>, Tuple, TimeWindow>() {
131-
//
132-
// @Override
133-
// public void apply(Tuple tuple, TimeWindow window, Iterable<Tuple3<String, JoinStatBo, Long>> values, Collector<Tuple3<String, JoinStatBo, Long>> out) throws Exception {
134-
// try {
135-
// JoinAgentStatBo joinAgentStatBo = join(values);
136-
// logger.info("2 agent stat aggre window function : " + joinAgentStatBo);
137-
// out.collect(new Tuple3<>(joinAgentStatBo.getId(), joinAgentStatBo, joinAgentStatBo.getTimestamp()));
138-
// } catch (Exception e) {
139-
// logger.error("window function error", e);
140-
// }
141-
// }
142-
//
143-
// private JoinAgentStatBo join(Iterable<Tuple3<String, JoinStatBo, Long>> values) {
144-
// List<JoinAgentStatBo> joinAgentStatBoList = new ArrayList<JoinAgentStatBo>();
145-
// for (Tuple3<String, JoinStatBo, Long> value : values) {
146-
// joinAgentStatBoList.add((JoinAgentStatBo) value.f1);
147-
// }
148-
//
149-
// return JoinAgentStatBo.joinAgentStatBo(joinAgentStatBoList);
150-
// }
151-
// })
152-
// .writeUsingOutputFormat(statisticsDao);
153-
154-
env.execute("Aggregation Stat Data");
52+
env.getConfig().setGlobalJobParameters(paramTool);
53+
env
54+
.addSource(bootstrap.getTcpSourceFunction()).name("TcpSourceFunction")
55+
.flatMap(bootstrap.getTbaseFlatMapper()).name("TBaseFlatMapper")
56+
.filter(el -> el.f1 instanceof JoinApplicationStatBo).name("OnlyJoinApplicationStatBo")
57+
.assignTimestampsAndWatermarks(WatermarkStrategy
58+
.<Tuple3<String, JoinStatBo, Long>>forBoundedOutOfOrderness(flowParams.getOutOfOrderness())
59+
.withIdleness(flowParams.getIdleness())
60+
.withTimestampAssigner((el, t) -> el.f2))
61+
.keyBy(el -> el.f0)
62+
.window(TumblingEventTimeWindows.of(flowParams.getWindowSize()))
63+
.allowedLateness(flowParams.getAllowedLateness())
64+
.apply(new ApplicationStatBoWindow()).name("ApplicationStatBoWindow")
65+
.addSink(bootstrap.getStatisticsDao()).name("StatisticsDao");
66+
env.execute(JOB_NAME);
15567
}
68+
69+
private static class FlowParameters {
70+
71+
private static final int DEFAULT_OUT_OF_ORDERNESS_MILLIS = 0;
72+
private static final int DEFAULT_IDLENESS_MILLIS = 10000;
73+
private static final int DEFAULT_WINDOW_SIZE_MILLIS = ApplicationStatBoWindow.WINDOW_SIZE;
74+
private static final int DEFAULT_ALLOWED_LATENESS_MILLIS = ApplicationStatBoWindow.ALLOWED_LATENESS;
75+
76+
private static final String PARAM_PREFIX = "pinpoint.flink.";
77+
private static final String PARAM_OUT_OF_ORDERNESS = PARAM_PREFIX + "outOfOrdernessMillis";
78+
private static final String PARAM_IDLENESS = PARAM_PREFIX + "idlenessMillis";
79+
private static final String PARAM_WINDOW_SIZE = PARAM_PREFIX + "windowSizeMillis";
80+
private static final String PARAM_ALLOWED_LATENESS = PARAM_PREFIX + "allowedLatenessMillis";
81+
82+
private final int outOfOrdernessMillis;
83+
private final int idlenessMillis;
84+
private final int windowSizeMillis;
85+
private final int allowedLatenessMillis;
86+
87+
FlowParameters(ParameterTool params) {
88+
this.outOfOrdernessMillis = params.getInt(PARAM_OUT_OF_ORDERNESS, DEFAULT_OUT_OF_ORDERNESS_MILLIS);
89+
this.idlenessMillis = params.getInt(PARAM_IDLENESS, DEFAULT_IDLENESS_MILLIS);
90+
this.windowSizeMillis = params.getInt(PARAM_WINDOW_SIZE, DEFAULT_WINDOW_SIZE_MILLIS);
91+
this.allowedLatenessMillis = params.getInt(PARAM_ALLOWED_LATENESS, DEFAULT_ALLOWED_LATENESS_MILLIS);
92+
}
93+
94+
Duration getOutOfOrderness() {
95+
return Duration.ofMillis(outOfOrdernessMillis);
96+
}
97+
98+
Duration getIdleness() {
99+
return Duration.ofMillis(idlenessMillis);
100+
}
101+
102+
Time getWindowSize() {
103+
return Time.milliseconds(windowSizeMillis);
104+
}
105+
106+
Time getAllowedLateness() {
107+
return Time.milliseconds(allowedLatenessMillis);
108+
}
109+
110+
}
111+
156112
}

0 commit comments

Comments
 (0)