Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixed issue where disconnect was not reconnecting and added tests #42

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 7 additions & 3 deletions src/main/java/com/github/pgasync/ConnectionPoolBuilder.java
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,11 @@ public ConnectionPoolBuilder validationQuery(String validationQuery) {
return this;
}

public ConnectionPoolBuilder validateSocket(boolean validateSocket) {
properties.validateSocket = validateSocket;
return this;
}

/**
* Configuration for a pool.
*/
Expand All @@ -111,6 +116,7 @@ public static class PoolProperties {
boolean useSsl;
boolean usePipelining;
String validationQuery;
boolean validateSocket;

public String getHostname() {
return hostname;
Expand Down Expand Up @@ -140,9 +146,7 @@ public DataConverter getDataConverter() {
return dataConverter != null ? dataConverter : new DataConverter(converters);
}
public Func1<Connection,Observable<Connection>> getValidator() {
return validationQuery == null || validationQuery.trim().isEmpty()
? Observable::just
: new ConnectionValidator(validationQuery)::validate;
return new ConnectionValidator(validationQuery, validateSocket)::validate;
}
}
}
46 changes: 30 additions & 16 deletions src/main/java/com/github/pgasync/impl/ConnectionValidator.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,26 +26,40 @@
public class ConnectionValidator {

final String validationQuery;
final boolean validateSocket;

public ConnectionValidator(String validationQuery) {
this.validationQuery = validationQuery;
public ConnectionValidator(String validationQuery, boolean validateSocket) {
// Trimmed as empty means no query for backwards compatibility
this.validationQuery = validationQuery == null || validationQuery.trim().isEmpty() ? null : validationQuery;
this.validateSocket = validateSocket;
}

public Observable<Connection> validate(Connection connection) {
return connection.queryRows(validationQuery)
.lift(subscriber -> new Subscriber<Row>() {
@Override
public void onError(Throwable e) {
subscriber.onError(e);
}
@Override
public void onCompleted() {
subscriber.onNext(connection);
subscriber.onCompleted();
}
@Override
public void onNext(Row row) { }
});
Observable<Connection> ret = Observable.just(connection);
if (validationQuery != null) {
ret = ret.flatMap(conn -> connection.queryRows(validationQuery)
.lift(subscriber -> new Subscriber<Row>() {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how about instead of this code not do:

   return connection.queryRows(validationQuery)
            .map( __-> connection);

?

it will return connection, ignore the query result and in case of any errors propagate them forward.

Copy link
Contributor Author

@cretz cretz Sep 19, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My goal here, in adding my socket validation, was to leave the existing logic as untouched as possible to avoid breaking any downstream code expectations. Therefore I simply refactored out what was there and added my check. Your comment applies to the original code as well and, to keep w/ minimal refactoring principles, maybe should be addressed there (i.e. open a PR addressing those concerns w/ the original source).

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree with your assumptions. It looks tempting to just remove all this code here :)

@Override
public void onError(Throwable e) {
subscriber.onError(e);
}
@Override
public void onCompleted() {
subscriber.onNext(connection);
subscriber.onCompleted();
}
@Override
public void onNext(Row row) { }
}));
}
if (validateSocket) {
ret = ret.doOnNext(conn -> {
if (conn instanceof PgConnection && !((PgConnection) conn).isConnected()) {
throw new IllegalStateException("Channel is closed");
}
});
}
return ret;

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it looks like you got two validation in this file - consider separating them, they do not need to have fields - just function with params.

Also I wander if you need the second one - when you are able to query does it mean you are able to connect?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In my case, I didn't have a validation query, only a socket validation. They are separate, but I figured placing all forms of validation in ConnectionValidator was reasonable since it's the validation result that determines whether it needs to be refreshed regardless of the method you choose to validate. I was avoiding an over-abstraction here but we could easily abstract out many ways to validate a connection.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It just looks like a two separate way of doing validation. Possibly one could have an option to use each one of them etc. This is just general observation about it.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You should firstly check if there is socket connection then if you can query.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Basically this socket check doesn't make much sense as if it is closed then the check query will fail.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jaceksokol - I explained this above with "I didn't have a validation query, only a socket validation." I don't want query validation. The socket dies underneath and I don't need a full query to the DB for the client side to detect socket death. People should not have to run queries against the DB to see if the client-side of the socket has died.

I just want the client-side socket death check (without requiring a validation query) and I figured the connection validator is the same place. I put it after because I don't want to change anything for how people rely on things today, just need to tack it on.

}

}
128 changes: 128 additions & 0 deletions src/test/java/com/github/pgasync/impl/ConnectionValidatorTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
package com.github.pgasync.impl;

import com.github.pgasync.Connection;
import com.github.pgasync.ResultSet;
import com.github.pgasync.SqlException;
import org.junit.Test;
import rx.Observable;

import java.util.function.Consumer;

import static org.junit.Assert.*;

public class ConnectionValidatorTest {

@Test
public void shouldBeTheSamePidOnSuccessiveCalls() {
withDbr(null, true, dbr -> {
// Simple sanity check for our PID assumptions
assertEquals(selectPid(dbr).toBlocking().single().intValue(),
selectPid(dbr).toBlocking().single().intValue());
});
}

@Test
public void shouldBeSamePidWhenValidationQuerySucceeds() {
withDbr("SELECT 1", false, dbr -> {
// Just compare PIDs
assertEquals(selectPid(dbr).toBlocking().single().intValue(),
selectPid(dbr).toBlocking().single().intValue());
});
}

@Test
public void shouldFailValidationQueryFailsAndReconnectAfterSuccess() throws Exception {
String errSql =
"DO language plpgsql $$\n" +
" BEGIN\n" +
" IF (SELECT COUNT(1) FROM VSTATE) = 1 THEN\n" +
" RAISE 'ERR';\n" +
" END IF;\n" +
" EXCEPTION\n" +
" WHEN undefined_table THEN\n" +
" END\n" +
"$$;";
withDbr(errSql, false, dbr -> {
// Add the VSTATE table
dbr.query("DROP TABLE IF EXISTS VSTATE; CREATE TABLE VSTATE (ID VARCHAR(255) PRIMARY KEY)");

try {
// Grab the pid
int pid = selectPid(dbr).toBlocking().single();

// Break it
runFromOutside(dbr, "INSERT INTO VSTATE VALUES('A')");

// Make sure it is broken
try {
selectPid(dbr).toBlocking().single();
fail("Should be broken");
} catch (SqlException e) { }

// Fix it, and go ahead and expect the same PID
runFromOutside(dbr, "TRUNCATE TABLE VSTATE");
assertEquals(pid, selectPid(dbr).toBlocking().single().intValue());
} finally {
runFromOutside(dbr, "DROP TABLE IF EXISTS VSTATE");
}
});
}

@Test
public void shouldErrorWhenNotValidatingSocket() {
withDbr(null, false, dbr -> {
// Simple check, kill from outside, confirm failure
assertNotNull(selectPid(dbr).toBlocking().single());
killConnectionFromOutside(dbr);
try {
selectPid(dbr).toBlocking().single();
fail("Should not succeed after killing connection");
} catch (IllegalStateException e) { }
});
}

@Test
public void shouldNotErrorWhenValidatingSocket() {
withDbr(null, true, dbr -> {
// Grab pid, kill from outside, confirm different pid
int pid = selectPid(dbr).toBlocking().single();
killConnectionFromOutside(dbr);
assertNotEquals(pid, selectPid(dbr).toBlocking().single().intValue());
});
}

private static Observable<Integer> selectPid(DatabaseRule dbr) {
return dbr.db().queryRows("SELECT pg_backend_pid()").map(r -> r.getInt(0));
}

private static void killConnectionFromOutside(DatabaseRule dbr) {
ResultSet rs = runFromOutside(dbr, "SELECT pg_terminate_backend(pid) FROM pg_stat_activity " +
"WHERE pid <> pg_backend_pid() AND datname = '" + ((PgConnectionPool) dbr.pool).database + "'");
assertEquals(1, rs.size());
// Unfortunately, it appears we have to wait a tiny bit after
// killing the connection for netty to know
try { Thread.sleep(300); } catch (Exception e) { }
}

private static ResultSet runFromOutside(DatabaseRule dbr, String query) {
PgConnectionPool pool = (PgConnectionPool) dbr.pool;
try(Connection conn = new PgConnection(pool.openStream(pool.address), pool.dataConverter).
connect(pool.username, pool.password, pool.database).toBlocking().single()) {
return conn.querySet(query).toBlocking().single();
} catch (Exception e) {
throw new RuntimeException(e);
}
}

private static void withDbr(String validationQuery, boolean validateSocket, Consumer<DatabaseRule> fn) {
DatabaseRule rule = new DatabaseRule();
rule.builder.validationQuery(validationQuery);
rule.builder.validateSocket(validateSocket);
rule.before();
try {
fn.accept(rule);
} finally {
rule.after();
}
}
}