Skip to content

Commit

Permalink
Fix bug in TextSocketSink to reconnect on socket failure
Browse files Browse the repository at this point in the history
  • Loading branch information
brianhks committed Jul 2, 2024
1 parent aba5a07 commit 1c89aae
Show file tree
Hide file tree
Showing 4 changed files with 268 additions and 50 deletions.
236 changes: 187 additions & 49 deletions metrics4j/src/main/java/org/kairosdb/metrics4j/sinks/TextSocketSink.java
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,13 @@
import java.io.ByteArrayOutputStream;
import java.io.Closeable;
import java.io.IOException;
import java.io.OutputStream;
import java.io.PrintWriter;
import java.net.DatagramPacket;
import java.net.DatagramSocket;
import java.net.InetAddress;
import java.net.Socket;
import java.nio.charset.StandardCharsets;

enum Protocol
{
Expand All @@ -24,12 +26,7 @@ public abstract class TextSocketSink implements MetricSink, Closeable
{
private static final Logger logger = LoggerFactory.getLogger(TextSocketSink.class);
private static final Formatter DEFAULT_FORMATTER = new DefaultFormatter();
protected Socket m_tcpSocket;
protected PrintWriter m_writer;
protected DatagramSocket m_udpSocket;
protected ByteArrayOutputStream m_udpBuffer;
protected InetAddress m_udpAddress;
protected int m_packetSize;
protected TextSocket m_textSocket;

protected String m_host;

Expand All @@ -39,6 +36,12 @@ public abstract class TextSocketSink implements MetricSink, Closeable

protected int m_maxUdpPacketSize = 1024;

protected int m_maxTcpBufferSize = 1024 * 6;

protected int m_retryCount = 2;

protected int m_retryDelay = 500; //retry delay in ms

public void setHost(String host)
{
m_host = host;
Expand All @@ -54,76 +57,215 @@ public void setProtocol(Protocol protocol)
m_protocol = protocol;
}

public void setRetryCount(int retryCount)
{
m_retryCount = retryCount;
}

public void setRetryDelay(int retryDelay)
{
m_retryDelay = retryDelay;
}

public void setMaxUdpPacketSize(int maxUdpPacketSize)
{
m_maxUdpPacketSize = maxUdpPacketSize;
}

protected void openSocket() throws IOException
public void setMaxTcpBufferSize(int maxTcpBufferSize)
{
logger.info("Connecting to {} on port {} {}", m_host, m_port, m_protocol);
m_maxTcpBufferSize = maxTcpBufferSize;
}

if (m_protocol == Protocol.TCP)
{
m_tcpSocket = new Socket(m_host, m_port);
m_writer = new PrintWriter(m_tcpSocket.getOutputStream());
}
else
@FunctionalInterface
public interface FlushFunction
{
void apply() throws IOException;
}

private void retry(FlushFunction flush)
{
int retry = 0;
boolean success = false;

while ((retry <= m_retryCount) && (!success))
{
m_udpSocket = new DatagramSocket();
m_udpAddress = InetAddress.getByName(m_host);
m_udpBuffer = new ByteArrayOutputStream();
m_writer = new PrintWriter(m_udpBuffer);
try
{
flush.apply();
success = true;
}
catch (IOException e)
{
logger.warn("Failed sending metrics to host {}", m_host);
logger.warn("Flush exception", e);
if (retry < m_retryCount)
{
try
{
Thread.sleep(m_retryDelay);

m_textSocket.close();
m_textSocket.connect();
}
catch (InterruptedException ex)
{
Thread.currentThread().interrupt();
retry = m_retryCount; //force retry to stop
}
catch (IOException ioe)
{
logger.warn("Connection failure", ioe);
}
}
}
retry ++;
}

if (!success)
logger.error("Failed sending metrics to host {}", m_host);
}

protected void sendText(String msg)
protected abstract class TextSocket
{
try
protected final String m_host;
protected final int m_port;
protected final int m_bufferSize;
protected final ByteArrayOutputStream m_textBuffer;

protected TextSocket(String host, int port, int bufferSize)
{
m_host = host;
m_port = port;
m_bufferSize = bufferSize;
m_textBuffer = new ByteArrayOutputStream();
}

public void sendText(String msg)
{
logger.debug(msg);
if (m_protocol == Protocol.UDP && (m_packetSize + msg.length() > m_maxUdpPacketSize))
if (m_textBuffer.size() + msg.length() + 1 > m_bufferSize)
{
flushUdp();
retry(this::flush);
}

try
{
m_textBuffer.write(msg.getBytes(StandardCharsets.UTF_8));
m_textBuffer.write('\n');
}
catch (IOException e)
{
logger.error("This should never happen as we are writing to a byte array buffer");
}

m_packetSize = msg.length() + 1; //add 1 for \n
m_writer.println(msg);
}
catch (Exception e)
{
logger.error("Failed sending metrics to host {}", m_host);
}
public abstract void flush() throws IOException;
public abstract void connect() throws IOException;
public abstract void close() throws IOException;
}

private void flushUdp() throws IOException
protected class TCPTextSocket extends TextSocket
{
m_writer.flush();
byte[] buf = m_udpBuffer.toByteArray();
m_udpBuffer.reset();
m_packetSize = 0;
DatagramPacket datagramPacket = new DatagramPacket(buf, buf.length, m_udpAddress, m_port);
m_udpSocket.send(datagramPacket);
private Socket m_tcpSocket;
private OutputStream m_outputStream;

protected TCPTextSocket(String host, int port, int maxBufferSize)
{
super(host, port, maxBufferSize);
}

@Override
public void connect() throws IOException
{
m_tcpSocket = new Socket(m_host, m_port);
m_outputStream = m_tcpSocket.getOutputStream();
}

@Override
public void flush() throws IOException
{
byte[] buf = m_textBuffer.toByteArray();
m_outputStream.write(buf);
m_outputStream.flush();

m_textBuffer.reset();
}

@Override
public void close() throws IOException
{
m_tcpSocket.close();
}
}

protected void flush()
protected class UDPTextSocket extends TextSocket
{
try
private DatagramSocket m_udpSocket;
private InetAddress m_udpAddress;

protected UDPTextSocket(String host, int port, int maxPacketSize)
{
if (m_protocol == Protocol.UDP)
flushUdp();
else
m_writer.flush();
super(host, port, maxPacketSize);
}
catch (Exception e)

@Override
public void connect() throws IOException
{
logger.error("Failed sending metrics to host {}", m_host);
m_udpAddress = InetAddress.getByName(m_host);
m_udpSocket = new DatagramSocket();
}


@Override
public void flush() throws IOException
{
byte[] buf = m_textBuffer.toByteArray();

DatagramPacket datagramPacket = new DatagramPacket(buf, buf.length, m_udpAddress, m_port);
m_udpSocket.send(datagramPacket);

m_textBuffer.reset();
}

@Override
public void close() throws IOException
{
m_udpSocket.close();
}
}

protected void openSocket() throws IOException
{
logger.info("Connecting to {} on port {} {}", m_host, m_port, m_protocol);

m_textSocket.connect();
}

protected void sendText(String msg)
{
logger.debug(msg);
m_textSocket.sendText(msg);
}


protected void flush()
{
retry(m_textSocket::flush);
}

@Override
public void init(MetricsContext context)
{
if (m_protocol == Protocol.TCP)
{
m_textSocket = new TCPTextSocket(m_host, m_port, m_maxTcpBufferSize);
}
else
{
m_textSocket = new UDPTextSocket(m_host, m_port, m_maxUdpPacketSize);
}

try
{
openSocket();
Expand All @@ -138,15 +280,11 @@ public void close()
{
try
{
if (m_tcpSocket != null)
m_tcpSocket.close();

if (m_udpSocket != null)
m_udpSocket.close();
m_textSocket.close();
}
catch (IOException e)
{
e.printStackTrace();
logger.warn("Exception while trying to close socket", e);
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
package org.kairosdb.metrics4jplugin.prometheus;


import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.kairosdb.metrics4j.MetricSourceManager;
import org.kairosdb.metrics4j.configuration.MetricConfig;

import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.net.http.HttpClient;
import java.net.http.HttpRequest;
import java.net.http.HttpResponse;
import java.util.Collections;

import static org.assertj.core.api.Assertions.assertThat;

class PrometheusSinkTest
{
@BeforeEach
void setUp()
{

}

@AfterEach
public void cleanup()
{
MetricSourceManager.clearConfig();
}

@Test
public void testPrometheusServer() throws IOException, InterruptedException, URISyntaxException
{
MetricConfig metricConfig = MetricConfig.parseConfig("prometheus_test.conf", "not.properties");

MetricSourceManager.setMetricConfig(metricConfig);

MetricSourceManager.addSource("PrometheusSinkTest", "testPrometheusServer",
Collections.emptyMap(), "help", () -> 42);

HttpClient httpClient = HttpClient.newHttpClient();
HttpRequest request = HttpRequest.newBuilder(new URI("http://localhost:9666/metrics")).GET().build();

HttpResponse<String> response = httpClient.send(request, HttpResponse.BodyHandlers.ofString());

System.out.println(response.body());
assertThat(response.body()).contains("PrometheusSinkTest_testPrometheusServer_value 42.0");

}
}
27 changes: 27 additions & 0 deletions plugins/prometheus/src/test/resources/prometheus_test.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
metrics4j: {

collectors: {
#No collectors are needed for JMX
}

sinks: {
prometheus: {
_class: "org.kairosdb.metrics4jplugin.prometheus.PrometheusSink"
listen-port: 9666
}
}

triggers: {
prometheus: {
_class: "org.kairosdb.metrics4jplugin.prometheus.PrometheusTrigger"
_folder: "/home/bhawkins/programs/metrics4j/m4j-prometheus-1.0.1"
}
}

sources: {
_disabled: false
_trigger: prometheus
_sink: [prometheus]
}

}
Loading

0 comments on commit 1c89aae

Please sign in to comment.