-
Notifications
You must be signed in to change notification settings - Fork 877
/
Copy pathProgram.cs
124 lines (112 loc) · 4.84 KB
/
Program.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
// Copyright 2018-2020 Confluent Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// Refer to LICENSE for more information.
using Confluent.Kafka.SyncOverAsync;
using Confluent.SchemaRegistry;
using Confluent.SchemaRegistry.Serdes;
using System;
using System.Threading;
using System.Threading.Tasks;
/// <summary>
/// An example of working with protobuf serialized data and
/// Confluent Schema Registry (v5.5 or later required for
/// Protobuf schema support).
/// </summary>
namespace Confluent.Kafka.Examples.Protobuf
{
class Program
{
static async Task Main(string[] args)
{
if (args.Length != 3)
{
Console.WriteLine("Usage: .. bootstrapServers schemaRegistryUrl topicName");
return;
}
string bootstrapServers = args[0];
string schemaRegistryUrl = args[1];
string topicName = args[2];
var producerConfig = new ProducerConfig
{
BootstrapServers = bootstrapServers
};
var schemaRegistryConfig = new SchemaRegistryConfig
{
// Note: you can specify more than one schema registry url using the
// schema.registry.url property for redundancy (comma separated list).
// The property name is not plural to follow the convention set by
// the Java implementation.
Url = schemaRegistryUrl,
};
var consumerConfig = new ConsumerConfig
{
BootstrapServers = bootstrapServers,
GroupId = "protobuf-example-consumer-group"
};
CancellationTokenSource cts = new CancellationTokenSource();
var consumeTask = Task.Run(() =>
{
using (var consumer =
new ConsumerBuilder<string, User>(consumerConfig)
.SetValueDeserializer(new ProtobufDeserializer<User>().AsSyncOverAsync())
.SetErrorHandler((_, e) => Console.WriteLine($"Error: {e.Reason}"))
.Build())
{
consumer.Subscribe(topicName);
try
{
while (true)
{
try
{
var consumeResult = consumer.Consume(cts.Token);
var user = consumeResult.Message.Value;
Console.WriteLine($"key: {consumeResult.Message.Key} user name: {user.Name}, favorite number: {user.FavoriteNumber}, favorite color: {user.FavoriteColor}");
}
catch (ConsumeException e)
{
Console.WriteLine($"Consume error: {e.Error.Reason}");
}
}
}
catch (OperationCanceledException)
{
consumer.Close();
}
}
});
using (var schemaRegistry = new CachedSchemaRegistryClient(schemaRegistryConfig))
using (var producer =
new ProducerBuilder<string, User>(producerConfig)
.SetValueSerializer(new ProtobufSerializer<User>(schemaRegistry))
.Build())
{
Console.WriteLine($"{producer.Name} producing on {topicName}. Enter user names, q to exit.");
long i = 1;
string text;
while ((text = Console.ReadLine()) != "q")
{
User user = new User { Name = text, FavoriteColor = "green", FavoriteNumber = i++ };
await producer
.ProduceAsync(topicName, new Message<string, User> { Key = text, Value = user })
.ContinueWith(task => task.IsFaulted
? $"error producing message: {task.Exception.Message}"
: $"produced to: {task.Result.TopicPartitionOffset}");
}
}
cts.Cancel();
}
}
}