@@ -13,6 +13,7 @@ use crate::api_connection;
13
13
use crate :: api_connection:: ApiConnection ;
14
14
use crate :: config:: { Config , ContentTypesSubConfig } ;
15
15
use crate :: data_structures:: { ArbitraryJson , Caches , CliArgs , ContentToRetrieve , JsonList } ;
16
+ use crate :: interfaces:: azure_oms_interface:: OmsInterface ;
16
17
use crate :: interfaces:: interface:: Interface ;
17
18
use crate :: interfaces:: file_interface:: FileInterface ;
18
19
use crate :: interfaces:: fluentd_interface:: FluentdInterface ;
@@ -55,6 +56,9 @@ impl Collector {
55
56
if config. output . graylog . is_some ( ) {
56
57
interfaces. push ( Box :: new ( GraylogInterface :: new ( config. clone ( ) ) ) ) ;
57
58
}
59
+ if config. output . oms . is_some ( ) {
60
+ interfaces. push ( Box :: new ( OmsInterface :: new ( config. clone ( ) , args. oms_key . clone ( ) ) ) ) ;
61
+ }
58
62
59
63
// Initialize collector threads
60
64
let api = api_connection:: get_api_connection (
@@ -94,7 +98,7 @@ impl Collector {
94
98
95
99
/// Monitor all started content retrieval threads, processing results and terminating
96
100
/// when all content has been retrieved (signalled by a final run stats message).
97
- pub fn monitor ( & mut self ) {
101
+ pub async fn monitor ( & mut self ) {
98
102
99
103
let start = Instant :: now ( ) ;
100
104
loop {
@@ -106,12 +110,12 @@ impl Collector {
106
110
}
107
111
// Run stats are only returned when all content has been retrieved,
108
112
// therefore this signals the end of the run.
109
- if self . check_stats ( ) {
113
+ if self . check_stats ( ) . await {
110
114
break
111
115
}
112
116
113
117
// Check if a log came in.
114
- self . check_results ( ) ;
118
+ self . check_results ( ) . await ;
115
119
}
116
120
self . end_run ( ) ;
117
121
}
@@ -120,25 +124,25 @@ impl Collector {
120
124
self . config . save_known_blobs ( & self . known_blobs ) ;
121
125
}
122
126
123
- fn check_results ( & mut self ) {
127
+ async fn check_results ( & mut self ) {
124
128
125
129
if let Ok ( Some ( ( msg, content) ) ) = self . result_rx . try_next ( ) {
126
- self . handle_content ( msg, content) ;
130
+ self . handle_content ( msg, content) . await ;
127
131
}
128
132
}
129
133
130
- fn handle_content ( & mut self , msg : String , content : ContentToRetrieve ) {
134
+ async fn handle_content ( & mut self , msg : String , content : ContentToRetrieve ) {
131
135
self . known_blobs . insert ( content. content_id . clone ( ) , content. expiration . clone ( ) ) ;
132
136
if let Ok ( logs) = serde_json:: from_str :: < JsonList > ( & msg) {
133
137
for log in logs {
134
- self . handle_log ( log, & content) ;
138
+ self . handle_log ( log, & content) . await ;
135
139
}
136
140
} else {
137
141
warn ! ( "Skipped log that could not be parsed: {}" , content. content_id)
138
142
}
139
143
}
140
144
141
- fn handle_log ( & mut self , mut log : ArbitraryJson , content : & ContentToRetrieve ) {
145
+ async fn handle_log ( & mut self , mut log : ArbitraryJson , content : & ContentToRetrieve ) {
142
146
143
147
if let Some ( filters) = self . filters . get ( & content. content_type ) {
144
148
for ( k, v) in filters. iter ( ) {
@@ -154,17 +158,17 @@ impl Collector {
154
158
self . cache . insert ( log, & content. content_type ) ;
155
159
self . saved += 1 ;
156
160
if self . cache . full ( ) {
157
- self . output ( ) ;
161
+ self . output ( ) . await ;
158
162
}
159
163
}
160
- fn check_stats ( & mut self ) -> bool {
164
+ async fn check_stats ( & mut self ) -> bool {
161
165
162
166
if let Ok ( Some ( ( found,
163
167
successful,
164
168
retried,
165
169
failed) ) ) = self . stats_rx . try_next ( ) {
166
170
167
- self . output ( ) ;
171
+ self . output ( ) . await ;
168
172
let output = self . get_output_string (
169
173
found,
170
174
successful,
@@ -180,15 +184,15 @@ impl Collector {
180
184
}
181
185
}
182
186
183
- fn output ( & mut self ) {
187
+ async fn output ( & mut self ) {
184
188
185
189
let mut cache = Caches :: new ( self . cache . size ) ;
186
190
swap ( & mut self . cache , & mut cache) ;
187
191
if self . interfaces . len ( ) == 1 {
188
- self . interfaces . get_mut ( 0 ) . unwrap ( ) . send_logs ( cache) ;
192
+ self . interfaces . get_mut ( 0 ) . unwrap ( ) . send_logs ( cache) . await ;
189
193
} else {
190
194
for interface in self . interfaces . iter_mut ( ) {
191
- interface. send_logs ( cache. clone ( ) ) ;
195
+ interface. send_logs ( cache. clone ( ) ) . await ;
192
196
}
193
197
}
194
198
}
@@ -287,7 +291,7 @@ fn initialize_channels(
287
291
retries : config. collect . retries . unwrap_or ( 3 ) ,
288
292
kill_rx,
289
293
} ;
290
- return ( blob_config, content_config, message_loop_config, blobs_rx, content_rx, result_rx,
294
+ ( blob_config, content_config, message_loop_config, blobs_rx, content_rx, result_rx,
291
295
stats_rx, kill_tx)
292
296
}
293
297
0 commit comments