Skip to content

Commit

Permalink
Reactive Postgres support with pgasync 0.7
Browse files Browse the repository at this point in the history
  • Loading branch information
sdeleuze committed Dec 2, 2015
1 parent 8e069c7 commit b90efcb
Show file tree
Hide file tree
Showing 2 changed files with 9 additions and 30 deletions.
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ dependencies {

compile "org.mongodb:mongodb-driver-reactivestreams:1.1.0"
compile "com.couchbase.client:java-client:2.2.2"
compile "com.github.alaisi.pgasync:postgres-async-driver:0.6"
compile "com.github.alaisi.pgasync:postgres-async-driver:0.7"

compile "org.slf4j:slf4j-jcl:1.7.12"
compile "org.slf4j:jul-to-slf4j:1.7.12"
Expand Down
37 changes: 8 additions & 29 deletions src/main/java/playground/postgres/PostgresPersonRepository.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,7 @@

package playground.postgres;

import java.util.Arrays;
import java.util.function.Consumer;

import com.github.pgasync.Db;
import com.github.pgasync.ResultSet;
import com.github.pgasync.Row;
import playground.Person;
import rx.Observable;

Expand All @@ -43,32 +38,16 @@ public PostgresPersonRepository(Db db) {

public Observable<Void> insert(Observable<Person> personStream) {
return personStream.flatMap(p ->
Observable.create(subscriber -> {
Consumer<ResultSet> onSuccess = result -> {
subscriber.onNext(result);
subscriber.onCompleted();
};
Consumer<Throwable> onError = throwable -> subscriber.onError(throwable);
db.query("insert into persons(firstname, lastname, address, postalCode, city) values($1, $2, $3, $4, $5)",
Arrays.asList(p.getFirstname(), p.getFirstname(), p.getAddress(), p.getPostalCode(), p.getCity()), onSuccess, onError);
// See https://github.com/ReactiveX/RxJava/issues/3037 about Observable<Void>
})).flatMap(document -> Observable.empty());
db.querySet("insert into persons(firstname, lastname, address, postalCode, city) values($1, $2, $3, $4, $5)",
p.getFirstname(), p.getFirstname(), p.getAddress(), p.getPostalCode(), p.getCity())
// See https://github.com/ReactiveX/RxJava/issues/3037 about Observable<Void>
).flatMap(document -> Observable.empty());
}

public Observable<Person> list() {
return Observable.create(subscriber -> {
Consumer<ResultSet> onSuccess = resultSet -> {
// We can only get the multiple rows at the same time, it would be better to retreive them
// with an Observable<Row> or a Publisher<Row>. More details on the related feature request at
// https://github.com/alaisi/postgres-async-driver/issues/4#issuecomment-160980796
for (Row row : resultSet) {
subscriber.onNext(new Person(row.getString("firstname"), row.getString("lastname"),
row.getString("address"), row.getString("postalCode"), row.getString("city")));
}
subscriber.onCompleted();
};
Consumer<Throwable> onError = throwable -> subscriber.onError(throwable);
db.query("select * from persons", onSuccess, onError);
});
return db.queryRows("select * from persons").map(row ->
new Person(row.getString("firstname"), row.getString("lastname"),
row.getString("address"), row.getString("postalCode"), row.getString("city"))
);
}
}

0 comments on commit b90efcb

Please sign in to comment.