Skip to content

Commit ef73045

Browse files
committed
Merge branch 'dockerinput' of github.com:carlanton/heka into carlanton_docker_input
Conflicts: CHANGES.txt build.bat
2 parents 6352534 + 55e9116 commit ef73045

File tree

10 files changed

+503
-2
lines changed

10 files changed

+503
-2
lines changed

CHANGES.txt

+2
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,8 @@ Features
4444
* Added separate env loading file and support for NUM_JOBS env var to Windows
4545
build (#971).
4646

47+
* Added DockerLogInput. (issue #1092)
48+
4749
0.7.3 (2014-MM-DD)
4850
==================
4951

CMakeLists.txt

+8-1
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ set(HEKA_CAT_EXE "${PROJECT_PATH}/bin/heka-cat${CMAKE_EXECUTABLE_SUFFIX}")
3939

4040
option(INCLUDE_SANDBOX "Include Lua sandbox" on)
4141
option(INCLUDE_MOZSVC "Include the Mozilla services plugins" on)
42+
option(INCLUDE_DOCKER_PLUGINS "Include Docker plugins" on)
4243

4344
find_path(INCLUDE_GEOIP GeoIP.h /usr/local/include /usr/include /opt/local/include)
4445
if (NOT INCLUDE_GEOIP)
@@ -49,6 +50,12 @@ else()
4950
set(PLUGIN_LOADER ${PLUGIN_LOADER} "github.com/mozilla-services/heka/plugins/geoip")
5051
endif()
5152

53+
if (INCLUDE_DOCKER_PLUGINS)
54+
message(STATUS "Docker plugins enabled.")
55+
set(PLUGIN_LOADER ${PLUGIN_LOADER} "github.com/mozilla-services/heka/plugins/docker")
56+
set(TAGS "${TAGS} dockerplugins")
57+
endif()
58+
5259
option(BENCHMARK "Enable the benchmark tests" off)
5360
if (BENCHMARK)
5461
set(BENCHMARK_FLAG -bench .)
@@ -145,7 +152,7 @@ WORKING_DIRECTORY "${HEKA_PATH}/message"
145152
add_dependencies(mocks message_matcher_parser)
146153

147154
add_custom_target(hekad ALL
148-
${GO_EXECUTABLE} install ${LDFLAGS} -tags=\"${TAGS}\" github.com/mozilla-services/heka/cmd/hekad
155+
${GO_EXECUTABLE} install ${LDFLAGS} -tags=${TAGS} github.com/mozilla-services/heka/cmd/hekad
149156
DEPENDS mocks)
150157

151158
if (INCLUDE_DOCUMENTATION)

build.bat

+1-1
Original file line numberDiff line numberDiff line change
@@ -5,5 +5,5 @@ if "%NUM_JOBS%"=="" (set NUM_JOBS=1)
55

66
if NOT exist %BUILD_DIR% mkdir %BUILD_DIR%
77
cd %BUILD_DIR%
8-
cmake -DINCLUDE_MOZSVC=false -DCMAKE_BUILD_TYPE=release -G"MinGW Makefiles" ..
8+
cmake -DINCLUDE_MOZSVC=false -DINCLUDE_DOCKER_PLUGINS=false -DCMAKE_BUILD_TYPE=release -G"MinGW Makefiles" ..
99
mingw32-make -j %NUM_JOBS%

cmake/externals.cmake

+4
Original file line numberDiff line numberDiff line change
@@ -158,6 +158,10 @@ if (INCLUDE_GEOIP)
158158
add_external_plugin(git https://github.com/abh/geoip da130741c8ed2052f5f455d56e552f2e997e1ce9)
159159
endif()
160160

161+
if (INCLUDE_DOCKER_PLUGINS)
162+
git_clone(https://github.com/fsouza/go-dockerclient 0236a64c6c4bd563ec277ba00e370cc753e1677c)
163+
endif()
164+
161165
if (INCLUDE_MOZSVC)
162166
add_external_plugin(git https://github.com/mozilla-services/heka-mozsvc-plugins 91278658b5d52bd45b0b74d54e478a230c0ef0c4)
163167
git_clone(https://github.com/getsentry/raven-go 0cc1491d9d27b258a9b4f0238908cb0d51bd6c9b)
+34
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
2+
DockerLogInput
3+
==============
4+
5+
.. versionadded:: 0.8
6+
7+
The DockerLogInput plugin attaches to all containers running on a host and
8+
sends their logs messages into the Heka pipeline. The plugin is based on
9+
`Logspout <https://github.com/progrium/logspout>`_ by Jeff Lindsay.
10+
Messages will be populated as follows:
11+
12+
- Uuid: Type 4 (random) UUID generated by Heka.
13+
- Timestamp: Time when the log line was received by the plugin.
14+
- Type: `DockerLog`.
15+
- Hostname: Hostname of the machine on which Heka is running.
16+
- Payload: The log line received from a Docker container.
17+
- Logger: `stdout` or `stderr`, depending on source.
18+
- Fields["ContainerID"] (string): The container ID
19+
- Fields["ContainerName"] (string): The container name
20+
21+
Config:
22+
23+
- endpoint (string):
24+
A Docker endpoint. Defaults to "unix:///var/run/docker.sock".
25+
- decoder (string):
26+
The name of the decoder used to further transform the message into a
27+
structured hekad message. No default decoder is specified.
28+
29+
Example:
30+
31+
.. code-block:: ini
32+
33+
[DockerLogInput]
34+
endpoint = "unix:///var/run/docker.sock"

docs/source/config/inputs/index.rst

+3
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,9 @@ Inputs
77
.. _config_amqp_input:
88
.. include:: /config/inputs/amqp.rst
99

10+
.. _config_docker_log_input:
11+
.. include:: /config/inputs/docker_log.rst
12+
1013
.. _config_file_polling_input:
1114
.. include:: /config/inputs/file_polling.rst
1215

docs/source/config/inputs/index_noref.rst

+2
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@ Inputs
55

66
.. include:: /config/inputs/amqp.rst
77

8+
.. include:: /config/inputs/docker_log.rst
9+
810
.. include:: /config/inputs/file_polling.rst
911

1012
.. include:: /config/inputs/http.rst

plugins/docker/attacher.go

+271
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,271 @@
1+
package docker
2+
3+
// Based on Logspout (https://github.com/progrium/logspout)
4+
//
5+
// Copyright (C) 2014 Jeff Lindsay
6+
//
7+
// Permission is hereby granted, free of charge, to any person obtaining a copy
8+
// of this software and associated documentation files (the "Software"), to deal
9+
// in the Software without restriction, including without limitation the rights
10+
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
11+
// copies of the Software, and to permit persons to whom the Software is
12+
// furnished to do so, subject to the following conditions:
13+
//
14+
// The above copyright notice and this permission notice shall be included in
15+
// all copies or substantial portions of the Software.
16+
//
17+
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
18+
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
19+
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
20+
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
21+
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
22+
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
23+
// SOFTWARE.
24+
25+
import (
26+
"bufio"
27+
"fmt"
28+
"io"
29+
"strings"
30+
"sync"
31+
"time"
32+
33+
"github.com/fsouza/go-dockerclient"
34+
)
35+
36+
type AttachEvent struct {
37+
Type string
38+
ID string
39+
Name string
40+
}
41+
42+
type Log struct {
43+
ID string
44+
Name string
45+
Type string
46+
Data string
47+
}
48+
49+
type Source struct {
50+
ID string
51+
Name string
52+
Filter string
53+
Types []string
54+
}
55+
56+
func (s *Source) All() bool {
57+
return s.ID == "" && s.Name == "" && s.Filter == ""
58+
}
59+
60+
type AttachManager struct {
61+
sync.RWMutex
62+
attached map[string]*LogPump
63+
channels map[chan *AttachEvent]struct{}
64+
client DockerClient
65+
errors chan<- error
66+
}
67+
68+
func NewAttachManager(client DockerClient, attachErrors chan<- error) (*AttachManager, error) {
69+
m := &AttachManager{
70+
attached: make(map[string]*LogPump),
71+
channels: make(map[chan *AttachEvent]struct{}),
72+
client: client,
73+
errors: attachErrors,
74+
}
75+
76+
// Attach to all currently running containers
77+
if containers, err := client.ListContainers(docker.ListContainersOptions{}); err == nil {
78+
for _, listing := range containers {
79+
m.attach(listing.ID[:12])
80+
}
81+
} else {
82+
return nil, err
83+
}
84+
85+
go func() {
86+
events := make(chan *docker.APIEvents)
87+
if err := client.AddEventListener(events); err != nil {
88+
m.errors <- err
89+
}
90+
for msg := range events {
91+
if msg.Status == "start" {
92+
go m.attach(msg.ID[:12])
93+
}
94+
}
95+
m.errors <- fmt.Errorf("Docker event channel is closed")
96+
}()
97+
98+
return m, nil
99+
}
100+
101+
func (m *AttachManager) attach(id string) {
102+
container, err := m.client.InspectContainer(id)
103+
if err != nil {
104+
m.errors <- err
105+
}
106+
name := container.Name[1:]
107+
success := make(chan struct{})
108+
failure := make(chan error)
109+
outrd, outwr := io.Pipe()
110+
errrd, errwr := io.Pipe()
111+
go func() {
112+
err := m.client.AttachToContainer(docker.AttachToContainerOptions{
113+
Container: id,
114+
OutputStream: outwr,
115+
ErrorStream: errwr,
116+
Stdin: false,
117+
Stdout: true,
118+
Stderr: true,
119+
Stream: true,
120+
Success: success,
121+
})
122+
outwr.Close()
123+
errwr.Close()
124+
if err != nil {
125+
close(success)
126+
failure <- err
127+
}
128+
m.send(&AttachEvent{Type: "detach", ID: id, Name: name})
129+
m.Lock()
130+
delete(m.attached, id)
131+
m.Unlock()
132+
}()
133+
_, ok := <-success
134+
if ok {
135+
m.Lock()
136+
m.attached[id] = NewLogPump(outrd, errrd, id, name)
137+
m.Unlock()
138+
success <- struct{}{}
139+
m.send(&AttachEvent{ID: id, Name: name, Type: "attach"})
140+
return
141+
}
142+
}
143+
144+
func (m *AttachManager) send(event *AttachEvent) {
145+
m.RLock()
146+
defer m.RUnlock()
147+
for ch, _ := range m.channels {
148+
// TODO: log err after timeout and continue
149+
ch <- event
150+
}
151+
}
152+
153+
func (m *AttachManager) addListener(ch chan *AttachEvent) {
154+
m.Lock()
155+
defer m.Unlock()
156+
m.channels[ch] = struct{}{}
157+
go func() {
158+
for id, pump := range m.attached {
159+
ch <- &AttachEvent{ID: id, Name: pump.Name, Type: "attach"}
160+
}
161+
}()
162+
}
163+
164+
func (m *AttachManager) removeListener(ch chan *AttachEvent) {
165+
m.Lock()
166+
defer m.Unlock()
167+
delete(m.channels, ch)
168+
}
169+
170+
func (m *AttachManager) Get(id string) *LogPump {
171+
m.Lock()
172+
defer m.Unlock()
173+
return m.attached[id]
174+
}
175+
176+
func (m *AttachManager) ClientPinger(stopChan chan<- error, t *time.Ticker) {
177+
for _ = range t.C {
178+
if len(m.attached) > 0 {
179+
continue
180+
}
181+
182+
if err := m.client.Ping(); err != nil {
183+
stopChan <- err
184+
}
185+
}
186+
}
187+
188+
func (m *AttachManager) Listen(logstream chan *Log, closer <-chan bool) {
189+
source := new(Source) // TODO: Make the source parameters configurable
190+
events := make(chan *AttachEvent)
191+
m.addListener(events)
192+
defer m.removeListener(events)
193+
194+
for {
195+
select {
196+
case event := <-events:
197+
if event.Type == "attach" && (source.All() ||
198+
(source.ID != "" && strings.HasPrefix(event.ID, source.ID)) ||
199+
(source.Name != "" && event.Name == source.Name) ||
200+
(source.Filter != "" && strings.Contains(event.Name, source.Filter))) {
201+
202+
pump := m.Get(event.ID)
203+
pump.AddListener(logstream)
204+
defer func() {
205+
if pump != nil {
206+
pump.RemoveListener(logstream)
207+
}
208+
}()
209+
} else if source.ID != "" && event.Type == "detach" &&
210+
strings.HasPrefix(event.ID, source.ID) {
211+
return
212+
}
213+
case <-closer:
214+
return
215+
}
216+
}
217+
}
218+
219+
type LogPump struct {
220+
sync.RWMutex
221+
ID string
222+
Name string
223+
channels map[chan *Log]struct{}
224+
}
225+
226+
func NewLogPump(stdout, stderr io.Reader, id, name string) *LogPump {
227+
obj := &LogPump{
228+
ID: id,
229+
Name: name,
230+
channels: make(map[chan *Log]struct{}),
231+
}
232+
pump := func(typ string, source io.Reader) {
233+
buf := bufio.NewReader(source)
234+
for {
235+
data, err := buf.ReadBytes('\n')
236+
if err != nil {
237+
return
238+
}
239+
obj.send(&Log{
240+
Data: strings.TrimSuffix(string(data), "\n"),
241+
ID: id,
242+
Name: name,
243+
Type: typ,
244+
})
245+
}
246+
}
247+
go pump("stdout", stdout)
248+
go pump("stderr", stderr)
249+
return obj
250+
}
251+
252+
func (o *LogPump) send(log *Log) {
253+
o.RLock()
254+
defer o.RUnlock()
255+
for ch, _ := range o.channels {
256+
// TODO: log err after timeout and continue
257+
ch <- log
258+
}
259+
}
260+
261+
func (o *LogPump) AddListener(ch chan *Log) {
262+
o.Lock()
263+
defer o.Unlock()
264+
o.channels[ch] = struct{}{}
265+
}
266+
267+
func (o *LogPump) RemoveListener(ch chan *Log) {
268+
o.Lock()
269+
defer o.Unlock()
270+
delete(o.channels, ch)
271+
}

0 commit comments

Comments
 (0)