Skip to content

Commit 97bbbca

Browse files
committed
Add kafka index source
1 parent a5470ce commit 97bbbca

File tree

1 file changed

+104
-0
lines changed

1 file changed

+104
-0
lines changed

index/kafka.go

+104
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,104 @@
1+
/*
2+
* Copyright 2022 National Library of Norway.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package index
18+
19+
import (
20+
"context"
21+
"errors"
22+
"io"
23+
"time"
24+
25+
"github.com/segmentio/kafka-go"
26+
)
27+
28+
type KafkaIndexOption func(options *kafka.ReaderConfig)
29+
30+
func WithBrokers(brokers []string) KafkaIndexOption {
31+
return func(options *kafka.ReaderConfig) {
32+
options.Brokers = brokers
33+
}
34+
}
35+
36+
func WithGroupID(groupID string) KafkaIndexOption {
37+
return func(options *kafka.ReaderConfig) {
38+
options.GroupID = groupID
39+
}
40+
}
41+
42+
func WithTopic(topic string) KafkaIndexOption {
43+
return func(options *kafka.ReaderConfig) {
44+
options.Topic = topic
45+
}
46+
}
47+
48+
func WithMinBytes(minBytes int) KafkaIndexOption {
49+
return func(options *kafka.ReaderConfig) {
50+
options.MinBytes = minBytes
51+
}
52+
}
53+
54+
func WithMaxBytes(maxBytes int) KafkaIndexOption {
55+
return func(options *kafka.ReaderConfig) {
56+
options.MaxBytes = maxBytes
57+
}
58+
}
59+
60+
func WithMaxWait(maxWait time.Duration) KafkaIndexOption {
61+
return func(options *kafka.ReaderConfig) {
62+
options.MaxWait = maxWait
63+
}
64+
}
65+
66+
type KafkaIndexer struct {
67+
kafka.ReaderConfig
68+
Queue
69+
}
70+
71+
func (k KafkaIndexer) Run(ctx context.Context) (err error) {
72+
defer func() {
73+
r := recover()
74+
switch v := r.(type) {
75+
case error:
76+
err = v
77+
}
78+
}()
79+
80+
r := kafka.NewReader(k.ReaderConfig)
81+
defer r.Close()
82+
83+
for {
84+
msg, err := r.ReadMessage(ctx)
85+
if errors.Is(err, io.EOF) {
86+
return nil
87+
}
88+
if err != nil {
89+
return err
90+
}
91+
k.Add(string(msg.Value))
92+
}
93+
}
94+
95+
func NewKafkaIndexer(q Queue, options ...KafkaIndexOption) KafkaIndexer {
96+
readerConfig := new(kafka.ReaderConfig)
97+
for _, apply := range options {
98+
apply(readerConfig)
99+
}
100+
return KafkaIndexer{
101+
ReaderConfig: kafka.ReaderConfig{},
102+
Queue: q,
103+
}
104+
}

0 commit comments

Comments
 (0)