Skip to content

Commit bb6d416

Browse files
WTZ468071157tiezhu
WTZ468071157
authored and
tiezhu
committed
[demo] add flink-demo
1 parent b1e7b89 commit bb6d416

File tree

10 files changed

+423
-6
lines changed

10 files changed

+423
-6
lines changed

flink-core/pom.xml

+12
Original file line numberDiff line numberDiff line change
@@ -121,6 +121,18 @@ under the License.
121121
<scope>test</scope>
122122
</dependency>
123123

124+
<dependency>
125+
<groupId>log4j</groupId>
126+
<artifactId>log4j</artifactId>
127+
<version>1.2.17</version>
128+
</dependency>
129+
130+
<dependency>
131+
<groupId>org.slf4j</groupId>
132+
<artifactId>slf4j-simple</artifactId>
133+
<version>1.7.30</version>
134+
</dependency>
135+
124136
</dependencies>
125137

126138
<build>

flink-demo/pom.xml

+116
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,116 @@
1+
<?xml version="1.0" encoding="UTF-8"?>
2+
<project xmlns="http://maven.apache.org/POM/4.0.0"
3+
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
4+
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
5+
<parent>
6+
<artifactId>flink-parent</artifactId>
7+
<groupId>org.apache.flink</groupId>
8+
<version>1.12-SNAPSHOT</version>
9+
<relativePath>..</relativePath>
10+
</parent>
11+
<modelVersion>4.0.0</modelVersion>
12+
13+
<artifactId>flink-demo</artifactId>
14+
<name>Flink : Demo</name>
15+
<packaging>jar</packaging>
16+
17+
<dependencies>
18+
<dependency>
19+
<groupId>org.apache.flink</groupId>
20+
<artifactId>flink-table-api-java-bridge_2.11</artifactId>
21+
<version>1.12-SNAPSHOT</version>
22+
</dependency>
23+
24+
<dependency>
25+
<groupId>org.apache.flink</groupId>
26+
<artifactId>flink-streaming-java_2.11</artifactId>
27+
<version>1.12-SNAPSHOT</version>
28+
</dependency>
29+
<dependency>
30+
<groupId>org.apache.flink</groupId>
31+
<artifactId>flink-connector-kafka_2.11</artifactId>
32+
<version>1.12-SNAPSHOT</version>
33+
</dependency>
34+
<dependency>
35+
<groupId>org.apache.flink</groupId>
36+
<artifactId>flink-json</artifactId>
37+
<version>1.12-SNAPSHOT</version>
38+
</dependency>
39+
<dependency>
40+
<groupId>org.apache.flink</groupId>
41+
<artifactId>flink-clients_2.11</artifactId>
42+
<version>1.12-SNAPSHOT</version>
43+
</dependency>
44+
<dependency>
45+
<groupId>org.apache.flink</groupId>
46+
<artifactId>flink-table-planner-blink_2.11</artifactId>
47+
<version>1.12-SNAPSHOT</version>
48+
</dependency>
49+
<dependency>
50+
<groupId>org.apache.flink</groupId>
51+
<artifactId>flink-table-planner_2.11</artifactId>
52+
<version>1.12-SNAPSHOT</version>
53+
</dependency>
54+
<dependency>
55+
<groupId>org.apache.flink</groupId>
56+
<artifactId>flink-connector-jdbc_2.11</artifactId>
57+
<version>1.12-SNAPSHOT</version>
58+
</dependency>
59+
60+
<dependency>
61+
<groupId>org.apache.flink</groupId>
62+
<artifactId>flink-yarn_2.11</artifactId>
63+
<version>1.12-SNAPSHOT</version>
64+
</dependency>
65+
</dependencies>
66+
67+
<build>
68+
<plugins>
69+
<plugin>
70+
<groupId>org.apache.maven.plugins</groupId>
71+
<artifactId>maven-shade-plugin</artifactId>
72+
<version>3.2.0</version>
73+
<executions>
74+
<execution>
75+
<phase>package</phase>
76+
<goals>
77+
<goal>shade</goal>
78+
</goals>
79+
<configuration>
80+
<artifactSet>
81+
<excludes>
82+
83+
</excludes>
84+
</artifactSet>
85+
<filters>
86+
<filter>
87+
<artifact>*:*</artifact>
88+
<excludes>
89+
<exclude>META-INF/*.SF</exclude>
90+
<exclude>META-INF/*.DSA</exclude>
91+
<exclude>META-INF/*.RSA</exclude>
92+
</excludes>
93+
</filter>
94+
</filters>
95+
<transformers>
96+
<transformer
97+
implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
98+
<resource>
99+
META-INF/services/org.apache.flink.core.execution.PipelineExecutorFactory
100+
</resource>
101+
</transformer>
102+
<transformer
103+
implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
104+
<resource>
105+
META-INF/services/org.apache.flink.yarn.executors.YarnJobClusterExecutorFactory
106+
</resource>
107+
</transformer>
108+
</transformers>
109+
</configuration>
110+
</execution>
111+
</executions>
112+
</plugin>
113+
</plugins>
114+
</build>
115+
116+
</project>
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
package com.wtz.demo;
2+
3+
import org.apache.flink.api.common.serialization.SerializationSchema;
4+
import org.apache.flink.api.common.serialization.SimpleStringSchema;
5+
import org.apache.flink.streaming.api.datastream.DataStream;
6+
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
7+
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
8+
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
9+
10+
import org.apache.kafka.clients.consumer.ConsumerConfig;
11+
import org.apache.kafka.clients.producer.ProducerConfig;
12+
13+
import java.util.Properties;
14+
15+
public class KafkaCore {
16+
public static Properties buildKafkaConsumerProperties(String kafkaBootstrapServers) {
17+
Properties kafkaProperties = new Properties();
18+
kafkaProperties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBootstrapServers);
19+
return kafkaProperties;
20+
}
21+
22+
public static DataStream<String> addKafkaSource(
23+
StreamExecutionEnvironment env, Properties kafkaProperties, String topic) {
24+
FlinkKafkaConsumer<String> kafkaSource =
25+
new FlinkKafkaConsumer<>(topic, new SimpleStringSchema(), kafkaProperties);
26+
// 设置消费最新数据
27+
kafkaSource.setStartFromLatest();
28+
return env.addSource(kafkaSource).name("kafkaSource");
29+
}
30+
31+
public static Properties buildKafkaProducerProperties(String kafkaBootstrapServers) {
32+
Properties kafkaProperties = new Properties();
33+
kafkaProperties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBootstrapServers);
34+
return kafkaProperties;
35+
}
36+
37+
public static void addKafkaSink(
38+
DataStream<String> source, Properties kafkaProducer, String topic) {
39+
FlinkKafkaProducer<String> kafkaSink =
40+
new FlinkKafkaProducer<>(
41+
topic, (SerializationSchema<String>) String::getBytes, kafkaProducer);
42+
source.addSink(kafkaSink).name("kafkaSink");
43+
}
44+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,116 @@
1+
package com.wtz.demo.data;
2+
3+
import org.apache.commons.lang3.builder.ToStringBuilder;
4+
import org.apache.commons.lang3.builder.ToStringStyle;
5+
6+
import java.sql.Date;
7+
import java.sql.Time;
8+
import java.sql.Timestamp;
9+
10+
public class DataEntity {
11+
private Integer id;
12+
13+
private String name;
14+
15+
private Integer age;
16+
17+
private Timestamp birth;
18+
19+
private Date todayDate;
20+
21+
private Time todayTime;
22+
23+
private Boolean sex;
24+
25+
public DataEntity(Integer id, String name, Timestamp birth, Time todayTime, Date todayDate) {
26+
this.id = id;
27+
this.name = name;
28+
this.birth = birth;
29+
this.todayTime = todayTime;
30+
this.todayDate = todayDate;
31+
}
32+
33+
public DataEntity(
34+
Integer id,
35+
String name,
36+
Integer age,
37+
Timestamp birth,
38+
Date todayDate,
39+
Time todayTime,
40+
Boolean sex) {
41+
this.id = id;
42+
this.name = name;
43+
this.age = age;
44+
this.birth = birth;
45+
this.todayDate = todayDate;
46+
this.todayTime = todayTime;
47+
this.sex = sex;
48+
}
49+
50+
public Integer getId() {
51+
return id;
52+
}
53+
54+
public void setId(Integer id) {
55+
this.id = id;
56+
}
57+
58+
public String getName() {
59+
return name;
60+
}
61+
62+
public void setName(String name) {
63+
this.name = name;
64+
}
65+
66+
public Integer getAge() {
67+
return age;
68+
}
69+
70+
public void setAge(Integer age) {
71+
this.age = age;
72+
}
73+
74+
public Timestamp getBirth() {
75+
return birth;
76+
}
77+
78+
public void setBirth(Timestamp birth) {
79+
this.birth = birth;
80+
}
81+
82+
public Date getTodayDate() {
83+
return todayDate;
84+
}
85+
86+
public void setTodayDate(Date todayDate) {
87+
this.todayDate = todayDate;
88+
}
89+
90+
public Time getTodayTime() {
91+
return todayTime;
92+
}
93+
94+
public void setTodayTime(Time todayTime) {
95+
this.todayTime = todayTime;
96+
}
97+
98+
public Boolean getSex() {
99+
return sex;
100+
}
101+
102+
public void setSex(Boolean sex) {
103+
this.sex = sex;
104+
}
105+
106+
@Override
107+
public String toString() {
108+
return new ToStringBuilder(this, ToStringStyle.SHORT_PREFIX_STYLE)
109+
.append("id", id)
110+
.append("name", name)
111+
.append("birth", birth)
112+
.append("todayDate", todayDate)
113+
.append("todayTime", todayTime)
114+
.toString();
115+
}
116+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
package com.wtz.demo.soruce;
2+
3+
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
4+
5+
import com.wtz.demo.data.DataEntity;
6+
7+
import java.sql.Date;
8+
import java.sql.Time;
9+
import java.sql.Timestamp;
10+
11+
public class MyDataSource extends RichParallelSourceFunction<DataEntity> {
12+
13+
private boolean running = true;
14+
15+
@Override
16+
public void run(SourceContext<DataEntity> ctx) throws Exception {
17+
int id = 0;
18+
19+
while (running) {
20+
Timestamp birth = new Timestamp(System.currentTimeMillis());
21+
ctx.collect(
22+
new DataEntity(
23+
id++,
24+
"tiezhu",
25+
birth,
26+
new Time(birth.getTime()),
27+
new Date(System.currentTimeMillis())));
28+
Thread.sleep(1000);
29+
}
30+
}
31+
32+
@Override
33+
public void cancel() {
34+
this.running = false;
35+
}
36+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
package com.wtz.demo.stream;
2+
3+
import org.apache.flink.streaming.api.datastream.DataStream;
4+
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
5+
6+
import com.wtz.demo.KafkaCore;
7+
8+
public class StreamDemoOne {
9+
public static void main(String[] args) throws Exception {
10+
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
11+
12+
// source
13+
DataStream<String> source =
14+
KafkaCore.addKafkaSource(
15+
env, KafkaCore.buildKafkaConsumerProperties("kudu1:9092"), "tiezhu_in");
16+
// sink
17+
KafkaCore.addKafkaSink(
18+
source, KafkaCore.buildKafkaProducerProperties("kudu1:9092"), "tiezhu_out");
19+
env.execute("Kafka stream Demo");
20+
}
21+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
package com.wtz.demo.stream;
2+
3+
import org.apache.flink.streaming.api.datastream.DataStreamSource;
4+
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
5+
import org.apache.flink.streaming.api.functions.sink.PrintSinkFunction;
6+
7+
import com.wtz.demo.data.DataEntity;
8+
import com.wtz.demo.soruce.MyDataSource;
9+
10+
public class StreamDemoTwo {
11+
public static void main(String[] args) throws Exception {
12+
executeLocalJob();
13+
}
14+
15+
public static void executeLocalJob() throws Exception {
16+
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
17+
18+
// add source
19+
DataStreamSource<DataEntity> source = env.addSource(new MyDataSource());
20+
21+
// 添加filter算子
22+
source.filter(DataEntity::getSex);
23+
24+
source.addSink(new PrintSinkFunction<>());
25+
26+
// 构建的DAG应该是 source -> filter -> sink
27+
env.execute("Stream Demo Two");
28+
}
29+
}

0 commit comments

Comments
 (0)