Skip to content
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion src/main/java/com/zendesk/maxwell/Maxwell.java
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,8 @@ protected Position getInitialPosition() throws Exception {
/* fourth method: capture the current master position. */
if ( initial == null ) {
try ( Connection c = context.getReplicationConnection() ) {
initial = Position.capture(c, config.gtidMode);
long lastHeartbeatRead = context.getLastHeartbeat();
initial = Position.capture(c, lastHeartbeatRead, config.gtidMode);
}
}

Expand Down
4 changes: 4 additions & 0 deletions src/main/java/com/zendesk/maxwell/MaxwellContext.java
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,10 @@ public void start() throws IOException {
getPositionStoreThread(); // boot up thread explicitly.
}

public long getLastHeartbeat() throws Exception {
return this.positionStore.getHeartbeat();
}

public long heartbeat() throws Exception {
return this.positionStore.heartbeat();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ private void publishFallback(RecordMetadata md, Exception e) {
// with no fallback topic to avoid infinite loops
KafkaCallback cb = new KafkaCallback(cc, position, key, json,
succeededMessageCount, failedMessageCount, succeededMessageMeter,
failedMessageMeter, topic, null, context, producer);
failedMessageMeter, fallbackTopic, null, context, producer);
producer.enqueueFallbackRow(fallbackTopic, key, cb, md, e);
}

Expand Down
6 changes: 5 additions & 1 deletion src/main/java/com/zendesk/maxwell/replication/Position.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,11 @@ public Position withHeartbeat(long lastHeartbeatRead) {
}

public static Position capture(Connection c, boolean gtidMode) throws SQLException {
return new Position(BinlogPosition.capture(c, gtidMode), 0L);
return capture(c, 0L, gtidMode);
}

public static Position capture(Connection c, long lastHeartbeatRead, boolean gtidMode) throws SQLException {
return new Position(BinlogPosition.capture(c, gtidMode), lastHeartbeatRead);
}

public long getLastHeartbeatRead() {
Expand Down
49 changes: 30 additions & 19 deletions src/main/java/com/zendesk/maxwell/schema/MysqlPositionStore.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,9 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


public class MysqlPositionStore {
static final Logger LOGGER = LoggerFactory.getLogger(MysqlPositionStore.class);
private static final Long DEFAULT_GTID_SERVER_ID = new Long(0);
private static final Long DEFAULT_GTID_SERVER_ID = 0L;
private final Long serverID;
private String clientID;
private final boolean gtidMode;
Expand All @@ -41,7 +40,7 @@ public void set(Position newPosition) throws SQLException, DuplicateProcessExcep
if ( newPosition == null )
return;

Long heartbeat = newPosition.getLastHeartbeatRead();
long heartbeat = newPosition.getLastHeartbeatRead();

String sql = "INSERT INTO `positions` set "
+ "server_id = ?, "
Expand Down Expand Up @@ -87,6 +86,10 @@ public synchronized void heartbeat(long heartbeatValue) throws SQLException, Dup
});
}

public long getHeartbeat() throws SQLException {
return getHeartbeat(connectionPool.getConnection());
}

/*
* the heartbeat system performs two functions:
* 1 - it leaves pointers in the binlog in order to facilitate master recovery
Expand All @@ -95,37 +98,45 @@ public synchronized void heartbeat(long heartbeatValue) throws SQLException, Dup

private Long lastHeartbeat = null;

private Long insertHeartbeat(Connection c, Long thisHeartbeat) throws SQLException, DuplicateProcessException {
String heartbeatInsert = "insert into `heartbeats` set `heartbeat` = ?, `server_id` = ?, `client_id` = ?";
private long getHeartbeat(Connection c) throws SQLException {
try ( PreparedStatement s = c.prepareStatement("SELECT `heartbeat` from `heartbeats` where server_id = ? and client_id = ?") ) {
s.setLong(1, serverID);
s.setString(2, clientID);

try ( ResultSet rs = s.executeQuery() ) {
if ( !rs.next() ) {
return 0L;
} else {
return rs.getLong("heartbeat");
}
}
}
}

private void insertHeartbeat(Connection c, Long thisHeartbeat) throws SQLException, DuplicateProcessException {
String heartbeatInsert = "insert into `heartbeats` set `heartbeat` = ?, `server_id` = ?, `client_id` = ?";

try ( PreparedStatement s = c.prepareStatement(heartbeatInsert) ) {
s.setLong(1, thisHeartbeat);
s.setLong(2, serverID);
s.setString(3, clientID);

s.execute();
return thisHeartbeat;
} catch ( SQLIntegrityConstraintViolationException e ) {
throw new DuplicateProcessException("Found heartbeat row for client,position while trying to insert. Is another maxwell running?");
}
}

private void heartbeat(Connection c, long thisHeartbeat) throws SQLException, DuplicateProcessException {
if ( lastHeartbeat == null ) {
try ( PreparedStatement s = c.prepareStatement("SELECT `heartbeat` from `heartbeats` where server_id = ? and client_id = ?") ) {
s.setLong(1, serverID);
s.setString(2, clientID);

try ( ResultSet rs = s.executeQuery() ) {
if ( !rs.next() ) {
insertHeartbeat(c, thisHeartbeat);
lastHeartbeat = thisHeartbeat;
return;
} else {
lastHeartbeat = rs.getLong("heartbeat");
}
}
long storedHeartbeat = getHeartbeat(c);

if (storedHeartbeat > 0) {
lastHeartbeat = storedHeartbeat;
} else {
insertHeartbeat(c, thisHeartbeat);
lastHeartbeat = thisHeartbeat;
return;
}
}

Expand Down