File tree 1 file changed +48
-0
lines changed
1 file changed +48
-0
lines changed Original file line number Diff line number Diff line change @@ -44,6 +44,54 @@ The received data is a DataStream of objects of the class **`NgsiEvent`**. This
44
44
45
45
- ** ` metadata ` ** : Additional metadata.
46
46
47
+
48
+ ### NGSILDSource
49
+
50
+ - Import dependency.
51
+
52
+ ``` scala
53
+ import org .fiware .cosmos .orion .flink .connector .{NGSILDSource }
54
+ ```
55
+
56
+ - Add source to Flink Environment. Indicate what port you want to listen to (e.g. 9001).
57
+
58
+ ``` scala
59
+ val env = StreamExecutionEnvironment .getExecutionEnvironment
60
+ val eventStream = env.addSource(new NGSILDSource (9001 ))
61
+ ```
62
+
63
+ - Parse the received data.
64
+
65
+ ``` scala
66
+ val processedDataStream = eventStream.
67
+ .flatMap(event => event.entities)
68
+ // ...processing
69
+ ```
70
+
71
+ The received data is a DataStream of objects of the class ** ` NgsiEvent ` ** . This class has the following attributes:
72
+
73
+ - ** ` creationTime ` ** : Timestamp of arrival.
74
+
75
+ - ** ` service ` ** : FIWARE service extracted from the HTTP headers.
76
+
77
+ - ** ` servicePath ` ** : FIWARE service path extracted from the HTTP headers.
78
+
79
+ - ** ` entities ` ** : Sequence of entites included in the message. Each entity has the following attributes:
80
+
81
+ - ** ` id ` ** : Identifier of the entity.
82
+
83
+ - ** ` type ` ** : Node type.
84
+
85
+ - ** ` attrs ` ** : Map of attributes in which the key is the attribute name and the value is an object with the
86
+ following properties:
87
+
88
+ - ** ` type ` ** : Type of value (Float, Int,...).
89
+
90
+ - ** ` value ` ** : Value of the attribute.
91
+
92
+ - ** ` @context ` ** : Map of terms to URIs providing an unambiguous definition.
93
+
94
+
47
95
### OrionSink
48
96
49
97
- Import dependency.
You can’t perform that action at this time.
0 commit comments