diff --git a/dbms/src/test/java/org/polypheny/db/postgres/PGInterfaceIntegrationTests.java b/dbms/src/test/java/org/polypheny/db/postgres/PGInterfaceIntegrationTests.java new file mode 100644 index 0000000000..2aea385095 --- /dev/null +++ b/dbms/src/test/java/org/polypheny/db/postgres/PGInterfaceIntegrationTests.java @@ -0,0 +1,267 @@ +/* + * Copyright 2019-2023 The Polypheny Project + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.polypheny.db.postgres; + +import static org.junit.Assert.assertEquals; + +import com.google.common.collect.ImmutableList; +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.Properties; +import lombok.extern.slf4j.Slf4j; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Ignore; +import org.junit.Test; +import org.polypheny.db.TestHelper; +import org.polypheny.db.catalog.Catalog; +import org.polypheny.db.catalog.entity.CatalogTable; +import org.polypheny.db.catalog.exceptions.UnknownSchemaException; +import org.polypheny.db.catalog.exceptions.UnknownTableException; + +//import static org.polypheny.db.postgresql.PGInterfaceInboundCommunicationHandler.ctx; + + +/** + * Tests the implementation of the PGInterface --> simulates a client that connects via JDBC + */ +@Slf4j +public class PGInterfaceIntegrationTests { + + //select: SELECT * FROM public.PGInterfaceTestTable + + //insert: INSERT INTO public.PGInterfaceTestTable(PkIdTest, VarcharTest, IntTest) VALUES (1, 'Franz', 1), (2, 'Hello', 2), (3, 'By', 3); + //create table: CREATE TABLE public.PGInterfaceTestTable(PkIdTest INTEGER NOT NULL, VarcharTest VARCHAR(255), IntTest INTEGER,PRIMARY KEY (PkIdTest)) + + + /** + * starts an instance of Polypheny and creates a PGInterface, as it does not exist per default + * + * @throws SQLException + */ + @BeforeClass + public static void start() throws SQLException { + // Ensures that Polypheny-DB is running + //noinspection ResultOfMethodCallIgnored + + TestHelper.getInstance(); + + try ( TestHelper.JdbcConnection polyphenyDbConnection = new TestHelper.JdbcConnection( false ) ) { + Connection connection = polyphenyDbConnection.getConnection(); + try ( Statement statement = connection.createStatement() ) { + statement.executeUpdate( "ALTER INTERFACES ADD \"pgtestinterface\" USING 'org.polypheny.db.postgresql.PGInterface' WITH '{\"port\":\"5433\"}'" ); + } + } + + } + + + /** + * Cleans up after the tests + * + * @throws SQLException + */ + @AfterClass + public static void stop() throws SQLException { + + try ( PsqlJdbcConnection psqlJdbcConnection = new PsqlJdbcConnection( false ) ) { + Connection connection = psqlJdbcConnection.getConnection(); + try ( Statement statement = connection.createStatement() ) { + statement.executeUpdate( "DROP TABLE IF EXISTS public.pginterfacetesttable" ); + //statement.executeUpdate( "ALTER INTERFACES DROP pgtestinerface" ); + } + } + } + + + /** + * Test that executes the ddl command CREATE and checks within the database if a table was created + * + * @throws SQLException + */ + @Test + public void testIfDDLIsExecuted() throws SQLException { + + try ( PsqlJdbcConnection psqlJdbcConnection = new PsqlJdbcConnection( false ) ) { + Connection connection = psqlJdbcConnection.getConnection(); + try ( Statement statement = connection.createStatement() ) { + statement.executeUpdate( "DROP TABLE IF EXISTS public.pginterfacetesttable" ); + statement.executeUpdate( "CREATE TABLE pginterfacetesttable(PkIdTest INTEGER NOT NULL, VarcharTest VARCHAR(255), IntTest INTEGER,PRIMARY KEY (PkIdTest))" ); + CatalogTable catalogTable = Catalog.getInstance().getTable( Catalog.getInstance().getSchema( Catalog.defaultDatabaseId, "public" ).id, "PGInterfaceTestTable" ); + assertEquals( catalogTable.name, "pginterfacetesttable" ); + statement.executeUpdate( "DROP TABLE IF EXISTS public.pginterfacetesttable" ); + } catch ( UnknownTableException e ) { + e.printStackTrace(); + } catch ( UnknownSchemaException e ) { + e.printStackTrace(); + } + } + } + + + /** + * This test executes several SQL-commands via the client, it creates a table, inserts and selects from it. The returned values from the select are tested + * + * @throws SQLException + */ + @Test + public void testIfDMLandDDLandDQLIsExecuted() throws SQLException { + + try ( PsqlJdbcConnection psqlJdbcConnection = new PsqlJdbcConnection( false ) ) { + Connection connection = psqlJdbcConnection.getConnection(); + try ( Statement statement = connection.createStatement() ) { + statement.executeUpdate( "DROP TABLE IF EXISTS public.pginterfacetesttable" ); + statement.executeUpdate( "CREATE TABLE pginterfacetesttable(PkIdTest INTEGER NOT NULL, VarcharTest VARCHAR(255), IntTest INTEGER,PRIMARY KEY (PkIdTest))" ); + statement.executeUpdate( "INSERT INTO pginterfacetesttable(PkIdTest, VarcharTest, IntTest) VALUES (1, 'Franz', 1), (2, 'Hello', 2), (3, 'By', 3);" ); + ResultSet rs = statement.executeQuery( "SELECT * FROM pginterfacetesttable;" ); + + TestHelper.checkResultSet( rs, ImmutableList.of( new Object[]{ 1, "Franz", 1 }, new Object[]{ 2, "Hello", 2 }, new Object[]{ 3, "By", 3 } ) ); + + statement.executeUpdate( "DROP TABLE IF EXISTS public.pginterfacetesttable" ); + } + } + } + + + /** + * Tests if a prepared statement is correctly executed if the PREPARE and EXECUTE statement are sent seperately + * + * @throws SQLException + */ + @Test + public void testPreparedAndExecuteInTwoParts() throws SQLException { + + try ( PsqlJdbcConnection psqlJdbcConnection = new PsqlJdbcConnection( false ) ) { + Connection connection = psqlJdbcConnection.getConnection(); + try ( Statement statement = connection.createStatement() ) { + statement.executeUpdate( "DROP TABLE IF EXISTS public.pginterfacetesttable" ); + statement.executeUpdate( "CREATE TABLE pginterfacetesttable(PkIdTest INTEGER NOT NULL, VarcharTest VARCHAR(255), IntTest INTEGER,PRIMARY KEY (PkIdTest))" ); + + //ResultSet rss = statement.executeQuery("PREPARE lol (int) AS SELECT empid FROM public.emps WHERE empid = $1; EXECUTE lol (100);"); + statement.executeUpdate( "PREPARE testPrepare (int, text, int) AS INSERT INTO pginterfacetesttable(PkIdTest, VarcharTest, IntTest) VALUES ($1, $2, $3);" ); + statement.executeUpdate( "EXECUTE testPrepare (1, 'Franz', 1);" ); + ResultSet rs = statement.executeQuery( "SELECT * FROM pginterfacetesttable;" ); + + TestHelper.checkResultSet( rs, ImmutableList.of( new Object[]{ 1, "Franz", 1 } ) ); + + statement.executeUpdate( "DROP TABLE IF EXISTS public.pginterfacetesttable" ); + } + } + } + + + /** + * Tests if a prepared statement is correctly executed if the PREPARE and EXECUTE statement are sent together + * + * @throws SQLException + */ + @Test + public void testPreparedAndExecuteInOnePart() throws SQLException { + + try ( PsqlJdbcConnection psqlJdbcConnection = new PsqlJdbcConnection( false ) ) { + Connection connection = psqlJdbcConnection.getConnection(); + try ( Statement statement = connection.createStatement() ) { + statement.executeUpdate( "DROP TABLE IF EXISTS public.pginterfacetesttable" ); + statement.executeUpdate( "CREATE TABLE pginterfacetesttable(PkIdTest INTEGER NOT NULL, VarcharTest VARCHAR(255), IntTest INTEGER,PRIMARY KEY (PkIdTest))" ); + + //ResultSet rss = statement.executeQuery("PREPARE lol (int) AS SELECT empid FROM public.emps WHERE empid = $1; EXECUTE lol (100);"); + statement.executeUpdate( "PREPARE testPrepare (int, text, int) AS INSERT INTO pginterfacetesttable(PkIdTest, VarcharTest, IntTest) VALUES ($1, $2, $3); EXECUTE testPrepare (1, 'Franz', 1);" ); + ResultSet rs = statement.executeQuery( "SELECT * FROM pginterfacetesttable;" ); + + TestHelper.checkResultSet( rs, ImmutableList.of( new Object[]{ 1, "Franz", 1 } ) ); + + statement.executeUpdate( "DROP TABLE IF EXISTS public.pginterfacetesttable" ); + } + } + } + + + /** + * This feature is not yet supported, but it tests if prepared statement are executed correctly using the JDBC framework + * + * @throws SQLException + */ + @Test + @Ignore + public void testPreparedUsingJdbc() throws SQLException { + //TODO(FF): Prepared Statements using JDBC not yet supported from PGInterface --> read inserted values from bind command (which is not done currently) + + try ( PsqlJdbcConnection psqlJdbcConnection = new PsqlJdbcConnection( false ) ) { + Connection connection = psqlJdbcConnection.getConnection(); + try ( Statement statement = connection.createStatement() ) { + statement.executeUpdate( "DROP TABLE IF EXISTS public.pginterfacetesttable" ); + statement.executeUpdate( "CREATE TABLE pginterfacetesttable(PkIdTest INTEGER NOT NULL, IntTest INTEGER,PRIMARY KEY (PkIdTest))" ); + + PreparedStatement pst = connection.prepareStatement( "INSERT INTO pginterfacetesttable(PkIdTest, IntTest) VALUES (?, ?)" ); + pst.setInt( 1, 100 ); + pst.execute(); + ResultSet rs = statement.executeQuery( "SELECT * FROM pginterfacetesttable;" ); + + TestHelper.checkResultSet( rs, ImmutableList.of( new Object[]{ 1, 100 } ) ); + + statement.executeUpdate( "DROP TABLE IF EXISTS public.pginterfacetesttable" ); + } + } + } + + + /** + * Creates a connection via Postgres-JDBC, autocommit is always enabled, and sslmode disabled + */ + public static class PsqlJdbcConnection implements AutoCloseable { + + private final static String dbHost = "localhost"; + private final static int port = 5433; + + private final Connection conn; + + + public PsqlJdbcConnection( boolean autoCommit ) throws SQLException { + try { + Class.forName( "org.postgresql.Driver" ); + } catch ( ClassNotFoundException e ) { + log.error( "PostgreSQL JDBC Driver not found", e ); + } + final String url = "jdbc:postgresql://" + dbHost + ":" + port + "/"; + log.debug( "Connecting to database @ {}", url ); + + Properties connectionProps = new Properties(); + connectionProps.setProperty( "sslmode", "disable" ); + conn = DriverManager.getConnection( url, connectionProps ); + + //conn.setAutoCommit( autoCommit ); + } + + + public Connection getConnection() { + return conn; + } + + + @Override + public void close() throws SQLException { + //conn.commit(); + conn.close(); + } + + } + +} diff --git a/docker-compose.yml b/docker-compose.yml index e87cf3c3c2..2605a37921 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -1,15 +1,15 @@ version: "3.4" services: - remote-api: - image: polypheny/polypheny-connector - restart: unless-stopped - container_name: polypheny-connector - ports: - - "2376:443" - environment: - - CREATE_CERTS_WITH_PW=supersecret - - CERT_HOSTNAME=localhost + remote-api: + image: polypheny/polypheny-connector + restart: unless-stopped + container_name: polypheny-connector + ports: + - "2376:443" + environment: + - CREATE_CERTS_WITH_PW=supersecret + - CERT_HOSTNAME=localhost - CERT_EXPIRATION_DAYS=3650 - volumes: - - ${POLYPHENY_HOME:-~}/.polypheny/certs/localhost:/data/certs - - /var/run/docker.sock:/var/run/docker.sock:ro \ No newline at end of file + volumes: + - ${POLYPHENY_HOME:-~}/.polypheny/certs/localhost:/data/certs + - /var/run/docker.sock:/var/run/docker.sock:ro \ No newline at end of file diff --git a/plugins/postgresql-interface/build.gradle b/plugins/postgresql-interface/build.gradle new file mode 100644 index 0000000000..6d5adbd8af --- /dev/null +++ b/plugins/postgresql-interface/build.gradle @@ -0,0 +1,52 @@ +group "org.polypheny" + + +configurations { + tests { + extendsFrom testRuntimeOnly + } +} + +dependencies { + compileOnly project(":core") + compileOnly project(":monitoring") + + ////// NETTY + // https://mvnrepository.com/artifact/io.netty/netty-all + implementation group: 'io.netty', name: 'netty-all', version: '4.1.24.Final' + + + // --- Test Compile --- + testImplementation project(path: ":core", configuration: "tests") + + testImplementation group: "junit", name: "junit", version: junit_version +} + + + + +/** + * Tests + */ +test { + maxHeapSize = "2g" // Increase heap size (default is 512MB) +} + +/** + * JARs + */ +jar { + manifest { + attributes "Manifest-Version": "1.0" + attributes "Copyright": "The Polypheny Project (polypheny.org)" + attributes "Version": "$project.version" + } +} +java { + withJavadocJar() + withSourcesJar() +} + +licensee { + allow('Apache-2.0') +} diff --git a/plugins/postgresql-interface/gradle.properties b/plugins/postgresql-interface/gradle.properties new file mode 100644 index 0000000000..fa142a538a --- /dev/null +++ b/plugins/postgresql-interface/gradle.properties @@ -0,0 +1,27 @@ +# +# Copyright 2019-2023 The Polypheny Project +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +pluginVersion = 0.0.1 + +pluginId = postgres-interface +pluginClass = org.polypheny.db.postgresql.PostgresqlInterfacePlugin +pluginProvider = The Polypheny Project +pluginDependencies = +pluginUrlPath = +pluginCategories = interface +pluginPolyDependencies = +pluginIsSystemComponent = false +pluginIsUiVisible = true \ No newline at end of file diff --git a/plugins/postgresql-interface/src/main/java/org/polypheny/db/postgresql/PGInterface.java b/plugins/postgresql-interface/src/main/java/org/polypheny/db/postgresql/PGInterface.java new file mode 100644 index 0000000000..3914c4bcc3 --- /dev/null +++ b/plugins/postgresql-interface/src/main/java/org/polypheny/db/postgresql/PGInterface.java @@ -0,0 +1,227 @@ +/* + * Copyright 2019-2023 The Polypheny Project + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.polypheny.db.postgresql; + + +import com.google.common.collect.ImmutableList; +import io.netty.bootstrap.ServerBootstrap; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelInitializer; +import io.netty.channel.ChannelOption; +import io.netty.channel.ChannelPipeline; +import io.netty.channel.EventLoopGroup; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.SocketChannel; +import io.netty.channel.socket.nio.NioServerSocketChannel; +import io.netty.handler.codec.string.StringDecoder; +import java.text.DecimalFormat; +import java.text.DecimalFormatSymbols; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.atomic.AtomicLong; +import lombok.extern.slf4j.Slf4j; +import org.polypheny.db.StatusService; +import org.polypheny.db.iface.Authenticator; +import org.polypheny.db.iface.QueryInterface; +import org.polypheny.db.information.InformationGroup; +import org.polypheny.db.information.InformationManager; +import org.polypheny.db.information.InformationPage; +import org.polypheny.db.information.InformationTable; +import org.polypheny.db.languages.QueryLanguage; +import org.polypheny.db.transaction.TransactionManager; +import org.polypheny.db.util.Util; + + +/** + * First point of contact for the PGInterface, setting changes from the UI are handled here + */ +@Slf4j +public class PGInterface extends QueryInterface { + + @SuppressWarnings( "WeakerAccess" ) + public static final String INTERFACE_NAME = "Postgresql Interface"; + @SuppressWarnings( "WeakerAccess" ) + // TODO: Update description text + public static final String INTERFACE_DESCRIPTION = "PostgreSQL-based query interface - in development"; + @SuppressWarnings( "WeakerAccess" ) + public static final List AVAILABLE_SETTINGS = ImmutableList.of( + new QueryInterfaceSettingInteger( "port", false, true, false, 5432 ) + // new QueryInterfaceSettingInteger( "maxUploadSizeMb", false, true, true, 10000 ), + // new QueryInterfaceSettingList( "serialization", false, true, false, ImmutableList.of( "PROTOBUF", "JSON" ) ) + // Possible to add more myself + ); + public static TransactionManager transactionManager = null; + + + private final int port; + private final String uniqueName; + + // Counters + private final Map statementCounters = new HashMap<>(); + + private final MonitoringPage monitoringPage; + + // Server things + private final EventLoopGroup bossGroup = new NioEventLoopGroup(); + private final EventLoopGroup workerGroup = new NioEventLoopGroup(); + + + public PGInterface( TransactionManager transactionManager, Authenticator authenticator, int ifaceId, String uniqueName, Map settings ) { + super( transactionManager, authenticator, ifaceId, uniqueName, settings, true, true ); + this.uniqueName = uniqueName; + this.port = Integer.parseInt( settings.get( "port" ) ); + if ( !Util.checkIfPortIsAvailable( port ) ) { + // Port is already in use + throw new RuntimeException( "Unable to start " + INTERFACE_NAME + " on port " + port + "! The port is already in use." ); + } + // Add information page + monitoringPage = new MonitoringPage(); + PGInterface.transactionManager = transactionManager; + } + + + /** + * Creates a netty server and its channelPipeline + */ + @Override + public void run() { + + try { + ServerBootstrap serverBootstrap = new ServerBootstrap(); + serverBootstrap.group( bossGroup, workerGroup ) + .channel( NioServerSocketChannel.class ) + .childHandler( new ChannelInitializer() { + @Override + public void initChannel( SocketChannel socketChannel ) throws Exception { + ChannelPipeline channelPipeline = socketChannel.pipeline(); + + //Inbound + channelPipeline.addLast( "decoder", new StringDecoder() ); + + //Handler + channelPipeline.addLast( "handler", new PGInterfaceServerHandler( transactionManager ) ); + + + } + } ).option( ChannelOption.SO_BACKLOG, 128 ).childOption( ChannelOption.SO_KEEPALIVE, true ); + + // Start accepting incoming connections + ChannelFuture channelFuture = serverBootstrap.bind( port ).sync(); + + // Waits until server socket is closed --> introduces bugs --> polypheny not starting (without reset) and not displaying interface correctly + //channelFuture.channel().closeFuture().sync(); + + } catch ( Exception e ) { + log.error( "Exception while starting" + INTERFACE_NAME, e ); + + } + + StatusService.printInfo( String.format( "%s started and is listening on port %d.", INTERFACE_NAME, port ) ); + } + + + @Override + public List getAvailableSettings() { + return AVAILABLE_SETTINGS; + } + + + @Override + public void shutdown() { + //TODO(FF): end things from run() --> already done?? + workerGroup.shutdownGracefully(); + bossGroup.shutdownGracefully(); + monitoringPage.remove(); + } + + + @Override + public String getInterfaceType() { + return INTERFACE_NAME; + } + + + @Override + protected void reloadSettings( List updatedSettings ) { + //TODO(FF): if settings are mutable, change it here (can make them mutable) + //nothing in avatica/http interface + } + + + @Override + public void languageChange() { + + } + + + private class MonitoringPage { + //TODO(FF): vergliiche met anderne interfaces (zeigt infos em ui aah) --> sött glaubs ok sii?? + + private final InformationPage informationPage; + private final InformationGroup informationGroupRequests; + private final InformationTable statementsTable; + + + public MonitoringPage() { + InformationManager im = InformationManager.getInstance(); + + informationPage = new InformationPage( uniqueName, INTERFACE_NAME ).fullWidth().setLabel( "Interfaces" ); + informationGroupRequests = new InformationGroup( informationPage, "Requests" ); + + im.addPage( informationPage ); + im.addGroup( informationGroupRequests ); + + statementsTable = new InformationTable( + informationGroupRequests, + Arrays.asList( "Language", "Percent", "Absolute" ) + ); + statementsTable.setOrder( 2 ); + im.registerInformation( statementsTable ); + + informationGroupRequests.setRefreshFunction( this::update ); + } + + + //reload button + public void update() { + double total = 0; + for ( AtomicLong counter : statementCounters.values() ) { + total += counter.get(); + } + + DecimalFormatSymbols symbols = DecimalFormatSymbols.getInstance(); + symbols.setDecimalSeparator( '.' ); + DecimalFormat df = new DecimalFormat( "0.0", symbols ); + statementsTable.reset(); + for ( Map.Entry entry : statementCounters.entrySet() ) { + statementsTable.addRow( entry.getKey().getSerializedName(), df.format( total == 0 ? 0 : (entry.getValue().longValue() / total) * 100 ) + " %", entry.getValue().longValue() ); + } + } + + + public void remove() { + InformationManager im = InformationManager.getInstance(); + im.removeInformation( statementsTable ); + im.removeGroup( informationGroupRequests ); + im.removePage( informationPage ); + } + + } + +} diff --git a/plugins/postgresql-interface/src/main/java/org/polypheny/db/postgresql/PGInterfaceErrorFieldTypes.java b/plugins/postgresql-interface/src/main/java/org/polypheny/db/postgresql/PGInterfaceErrorFieldTypes.java new file mode 100644 index 0000000000..0e88a68d7e --- /dev/null +++ b/plugins/postgresql-interface/src/main/java/org/polypheny/db/postgresql/PGInterfaceErrorFieldTypes.java @@ -0,0 +1,42 @@ +/* + * Copyright 2019-2023 The Polypheny Project + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.polypheny.db.postgresql; + +/** + * The Fields which appear in Error and Notice Messages. They are usually followed by a String which contains the field value + * For more information see: https://www.postgresql.org/docs/current/protocol-error-fields.html + */ +public enum PGInterfaceErrorFieldTypes { + + /** + * Severity: the field contents are ERROR, FATAL, or PANIC (in an error message), or WARNING, NOTICE, DEBUG, INFO, + * or LOG (in a notice message), or a localized translation of one of these. Always present. + */ + S, + + /** + * Code: the SQLSTATE code for the error. Not localizable. Always present. + */ + C, + + /** + * Message: the primary human-readable error message. This should be accurate but terse (typically one line). Always present. + */ + M + + +} diff --git a/plugins/postgresql-interface/src/main/java/org/polypheny/db/postgresql/PGInterfaceErrorHandler.java b/plugins/postgresql-interface/src/main/java/org/polypheny/db/postgresql/PGInterfaceErrorHandler.java new file mode 100644 index 0000000000..d799a7fef9 --- /dev/null +++ b/plugins/postgresql-interface/src/main/java/org/polypheny/db/postgresql/PGInterfaceErrorHandler.java @@ -0,0 +1,85 @@ +/* + * Copyright 2019-2023 The Polypheny Project + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.polypheny.db.postgresql; + +import io.netty.buffer.ByteBuf; +import io.netty.channel.ChannelHandlerContext; +import java.util.LinkedHashMap; +import lombok.extern.slf4j.Slf4j; + +/** + * Writes and sends error messages to the client + */ +@Slf4j +public class PGInterfaceErrorHandler { + + private final ChannelHandlerContext ctx; + private final PGInterfaceInboundCommunicationHandler pgInterfaceInboundCommunicationHandler; + private String errorMsg; + private Throwable exception; + private PGInterfaceServerWriter serverWriter; + + + /** + * Creates a error handler that can send error messages to the client + * + * @param ctx Is needed to send the error message to the designated client + * @param pgInterfaceInboundCommunicationHandler Is needed to create PGInterfaceServerWriter to be able to send a error message + */ + public PGInterfaceErrorHandler( ChannelHandlerContext ctx, PGInterfaceInboundCommunicationHandler pgInterfaceInboundCommunicationHandler ) { + this.ctx = ctx; + this.pgInterfaceInboundCommunicationHandler = pgInterfaceInboundCommunicationHandler; + } + + /** + * Sends a simple error message to the client. The severity and other error fields are all fixed. + * + * @param errorMsg The message you want to send + */ + public void sendSimpleErrorMessage( String errorMsg ) { + //Notes on how error messages are sent: + /* + E...n S ERROR. V ERROR. C 42P01. M relation "public.hihi" does not exist. P 15. F parse_relation. c. L 1360. R parserOpenTable. . Z....I + E...x S ERROR. V ERROR. C 42P01. M relation "public.hihi" does not exist. P 15. F parse_relation. c. L 1360. R parserOpenTable. . Z....I + header, length, severity, severity (gl wie vorher), SQLSTATE code, Message, (Position, File, column name?, Line, Routine, zerobyte as field) freiwillig + 42P01 - undefined_table$ + 0A000 - feature_not_supported + E..._SERROR.VERROR.C42601.Msyntax error at or near "SSELECT".P1.Fscan.l.L1176.Rscanner_yyerror..Z....I + */ + + this.errorMsg = errorMsg; + PGInterfaceMessage pgInterfaceMessage = new PGInterfaceMessage( PGInterfaceHeaders.E, "MockBody", 4, true ); + this.serverWriter = new PGInterfaceServerWriter( "MockType", pgInterfaceMessage, ctx, pgInterfaceInboundCommunicationHandler ); + + //TODO(FF): An error occurs because of the errormessage on the clientside. It doesn't really matter, because the connection would be terminated anyway and the message itself arrives... + LinkedHashMap errorFields = new LinkedHashMap(); + errorFields.put( 'S', "ERROR" ); + errorFields.put( 'V', "ERROR" ); + errorFields.put( 'C', "0A000" ); + errorFields.put( 'M', errorMsg ); + errorFields.put( 'P', "notImplemented" ); + errorFields.put( 'F', "polypheny" ); + errorFields.put( 'c', "" ); + errorFields.put( 'L', "1360" ); + errorFields.put( 'R', "parserOpenTable" ); + + + ByteBuf buffer = serverWriter.writeSimpleErrorMessage( errorFields ); + ctx.writeAndFlush( buffer ); + + } +} diff --git a/plugins/postgresql-interface/src/main/java/org/polypheny/db/postgresql/PGInterfaceHeaders.java b/plugins/postgresql-interface/src/main/java/org/polypheny/db/postgresql/PGInterfaceHeaders.java new file mode 100644 index 0000000000..a8a91f58a1 --- /dev/null +++ b/plugins/postgresql-interface/src/main/java/org/polypheny/db/postgresql/PGInterfaceHeaders.java @@ -0,0 +1,121 @@ +/* + * Copyright 2019-2023 The Polypheny Project + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.polypheny.db.postgresql; + + +/** + * All the different header, alphabetically and sender ordered. + * There exist some special cases (as numbers as header, or no header) + * For more information look at the Postgresql documentation: https://www.postgresql.org/docs/current/protocol-message-formats.html + */ +public enum PGInterfaceHeaders { + + //----------------------------------------------- server to client ------------------------------------------------ + + /** + * CommandComplete - from server to client + */ + C, + + /** + * DataRow - from server to client - identifies message as a data row + */ + D, + + /** + * ErrorResponse - from server to client - message is error + * Execute - from client to server - used in extended query cycle - Error Field Type not in Headers + */ + E, + + /** + * EmptyQueryResponse - from server to client - response to empty query String  substitutes for CommandComplete + */ + I, + + /** + * NoticeResponse - from server to client + */ + N, + + /** + * no data indicator - from server to client - indicator + */ + n, + + /** + * Authenticatio request - from server to client - (used by several different messages + */ + R, + + /** + * ParameterStatus message - from server to client - (shauld actually be) voluntary + */ + S, + + /** + * RowDescription - from server to client + */ + T, + + /** + * ParameterDescription - from server to client + */ + t, + + /** + * ReadyForQuery - from server to client - whenever backend is ready for new query cycle + */ + Z, + + /** + * ParseComplete - from server to client - indicator (actually sent as 1) + */ + ONE, + + /** + * BindComplete - from server to client - indicator + */ + TWO, + + /** + * CloseComplete - from server to client - indicator + */ + THREE, + + //----------------------------------------------- client to server ------------------------------------------------ + + //E - also used from server to client, described there + //some messages with no header (StartUpMessage, CancelREquest, and some authentication requests) + + /** + * Parse - contains query in the extended query cycle + */ + P, + + /** + * Simple Query - from client to server - used in simple query cycle + */ + Q, + + /** + * Termination message - from client to server + */ + X + + +} diff --git a/plugins/postgresql-interface/src/main/java/org/polypheny/db/postgresql/PGInterfaceInboundCommunicationHandler.java b/plugins/postgresql-interface/src/main/java/org/polypheny/db/postgresql/PGInterfaceInboundCommunicationHandler.java new file mode 100644 index 0000000000..dd7dcf7006 --- /dev/null +++ b/plugins/postgresql-interface/src/main/java/org/polypheny/db/postgresql/PGInterfaceInboundCommunicationHandler.java @@ -0,0 +1,426 @@ +/* + * Copyright 2019-2023 The Polypheny Project + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.polypheny.db.postgresql; + +import io.netty.buffer.ByteBuf; +import io.netty.channel.ChannelHandlerContext; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import lombok.extern.slf4j.Slf4j; +import org.polypheny.db.transaction.TransactionManager; + +/** + * Manages all incoming communication, not a handler from netty, but called by one + */ +@Slf4j +public class PGInterfaceInboundCommunicationHandler { + + private final PGInterfaceErrorHandler errorHandler; + String type; + ChannelHandlerContext ctx; + TransactionManager transactionManager; + + + public PGInterfaceInboundCommunicationHandler( String type, ChannelHandlerContext ctx, TransactionManager transactionManager ) { + this.type = type; + this.ctx = ctx; + this.transactionManager = transactionManager; + this.errorHandler = new PGInterfaceErrorHandler( ctx, this ); + } + + + /** + * Decides in what cycle (from postgres message flow) the client is: startup-phase, query-phase, etc. + * + * @param oMsg the incoming message from the client (unchanged) - whole message is interpreted as a string from the netty decoder + * @param pgInterfaceServerHandler is needed to access the saved prepared statements for a connection + */ + public void decideCycle( Object oMsg, PGInterfaceServerHandler pgInterfaceServerHandler ) { + String msgWithZeroBits = (( String ) oMsg); + String wholeMsg = msgWithZeroBits.replace( "\u0000", "" ); + + //TODO(FF): simple query phase is not implemented + if ( wholeMsg.substring( 2, 6 ).contains( "user" ) ) { + startUpPhase(); + } else if ( wholeMsg.charAt( 0 ) == 'P' ) { + extendedQueryPhase( wholeMsg, pgInterfaceServerHandler ); + } else if ( wholeMsg.charAt( 0 ) == 'X' ) { + //TODO(FF): (low prio, bcs everything works as inteded, but) seems to never be reached, instead in PGInterfaceServerHandler the exception is reached... maybe client closes connection and netty realizes this and stops handler + terminateConnection(); + } else if ( wholeMsg.charAt( 0 ) == 'Q' ) { + simpleQueryPhase(); + } else { + errorHandler.sendSimpleErrorMessage( "The incoming message could not be parsed by the PGInterface." ); + } + } + + + /** + * Performs necessary steps on the first connection with the client (mostly sends necessary replies, but doesn't really set anything on the server side). + * Sends authenticationOk (without checking authentication), sets server version 14 (required from jdbc), sends readyForQuery + */ + public void startUpPhase() { + //authenticationOk + PGInterfaceMessage authenticationOk = new PGInterfaceMessage( PGInterfaceHeaders.R, "0", 8, false ); + PGInterfaceServerWriter authenticationOkWriter = new PGInterfaceServerWriter( "i", authenticationOk, ctx, this ); + ctx.writeAndFlush( authenticationOkWriter.writeOnByteBuf() ); + + //server_version (Parameter Status message) + PGInterfaceMessage parameterStatusServerVs = new PGInterfaceMessage( PGInterfaceHeaders.S, "server_version" + PGInterfaceMessage.getDelimiter() + "14", 4, true ); + PGInterfaceServerWriter parameterStatusServerVsWriter = new PGInterfaceServerWriter( "ss", parameterStatusServerVs, ctx, this ); + ctx.writeAndFlush( parameterStatusServerVsWriter.writeOnByteBuf() ); + + //ReadyForQuery + sendReadyForQuery( "I" ); + } + + + /** + * Handles the steps if we are in the simple query phase. + */ + public void simpleQueryPhase() { + //TODO(FF): (low priority) The simple query phase is handled a bit differently than the extended query phase. The most important difference is that the simple query phase accepts several queries at once and sends some different response messages (e.g. no parse/bindComplete). + //Several queries seperated with ";" + + errorHandler.sendSimpleErrorMessage( "The simple query phase is not implemented in the PostgreSQL Interface" ); + + } + + + /** + * Handles the steps if we are in the extended query phase. + * Sends necessary responses to client (without really setting anything in backend) and prepares the incoming query for usage. Continues query forward to QueryHandler + * + * @param incomingMsg unchanged incoming message (transformed to string by netty) + * @param pgInterfaceServerHandler is needed to access the saved prepared statements for a connection + */ + public void extendedQueryPhase( String incomingMsg, PGInterfaceServerHandler pgInterfaceServerHandler ) { + + if ( incomingMsg.startsWith( "SET", 2 ) ) { + //TODO(FF): actually handle the SET commands (e.g. SET extra_float_digits = 3) + sendParseBindComplete(); + sendCommandComplete( "SET", -1 ); + sendReadyForQuery( "I" ); + + } else if ( incomingMsg.startsWith( "PREPARE", 2 ) ) { + //TODO(FF): Hanlde prepared statements sent via JDBC framework - they don't contain the PREPARE and EXECUTE commands (don't necessarily handle it here) + ArrayList preparedStatementNames = pgInterfaceServerHandler.getPreparedStatementNames(); + + //Prepared Statement + String[] query = extractPreparedQuery( incomingMsg ); + String prepareString = query[0]; + String executeString = query[1]; + String prepareStringQueryName = extractPreparedQueryName( prepareString ); + + //check if name already exists + if ( preparedStatementNames.isEmpty() || (!preparedStatementNames.contains( prepareStringQueryName )) ) { + PGInterfacePreparedMessage preparedMessage = new PGInterfacePreparedMessage( prepareStringQueryName, prepareString, ctx ); + preparedMessage.prepareQuery(); + + //safe prepared messages for the connection + pgInterfaceServerHandler.addPreparedStatementNames( prepareStringQueryName ); + pgInterfaceServerHandler.addPreparedMessage( preparedMessage ); + + //send response to "prepare" query + sendParseBindComplete(); + sendNoData(); + sendCommandComplete( "PREPARE", -1 ); + + //check if an execute statement was sent along + if ( !executeString.isEmpty() ) { + String statementName = extractPreparedQueryName( executeString ); + + //check if name exists already + if ( preparedStatementNames.contains( statementName ) ) { + executePreparedStatement( executeString, statementName, pgInterfaceServerHandler ); + } else { + String errorMsg = "There does not exist a prepared statement called" + statementName; + errorHandler.sendSimpleErrorMessage( errorMsg ); + } + + } else { + sendReadyForQuery( "I" ); + } + } else { + String errorMsg = "There already exists a prepared statement with the name" + prepareStringQueryName + "which has not yet been executed"; + errorHandler.sendSimpleErrorMessage( errorMsg ); + } + + + } else if ( incomingMsg.startsWith( "EXECUTE", 2 ) ) { + ArrayList preparedStatementNames = pgInterfaceServerHandler.getPreparedStatementNames(); + + //get execute statement + String executeQuery = extractQuery( incomingMsg ); + String statementName = extractPreparedQueryName( executeQuery ); + + //check if name exists already + if ( preparedStatementNames.contains( statementName ) ) { + executePreparedStatement( executeQuery, statementName, pgInterfaceServerHandler ); + } else { + String errorMsg = "There does not exist a prepared statement called" + statementName; + errorHandler.sendSimpleErrorMessage( errorMsg ); + } + } else { + //"Normal" query + String query = extractQuery( incomingMsg ); + PGInterfaceQueryHandler queryHandler = new PGInterfaceQueryHandler( query, ctx, this, transactionManager ); + queryHandler.start(); + } + } + + /** + * Starts a query handler with the information that the query right now is a prepared query + * + * @param executeString The string which contains the execute query + * @param statementName The name of the prepared statement + * @param pgInterfaceServerHandler is needed to access the saved prepared statements for a connection + */ + private void executePreparedStatement( String executeString, String statementName, PGInterfaceServerHandler pgInterfaceServerHandler ) { + ArrayList preparedStatementNames = pgInterfaceServerHandler.getPreparedStatementNames(); + + // get corresponding prepared message + int idx = preparedStatementNames.indexOf( statementName ); + PGInterfacePreparedMessage preparedMessage = pgInterfaceServerHandler.getPreparedMessage( idx ); + preparedMessage.setExecuteString( executeString ); + preparedMessage.extractAndSetValues(); + + PGInterfaceQueryHandler queryHandler = new PGInterfaceQueryHandler( preparedMessage, ctx, this, transactionManager ); + queryHandler.start(); + } + + + /** + * Prepares (parses) the incoming message from the client, so it can be used in the context of polypheny + * NOTE: Some incoming messages from the client are disregarded (they are sent the same way all the time, if something unusual occurs, this is not handled yet, i.e. hardcoded to find the end of the query itself). + * + * @param incomingMsg unchanged incoming message from the client + * @return "normally" readable and usable query string + */ + public String extractQuery( String incomingMsg ) { + String query = ""; + //cut header + query = incomingMsg.substring( 2, incomingMsg.length() - 1 ); + + //find end of query --> normally it ends with combination of BDPES (are headers (some indicators from client), with some weird other bits in between) + //B starts immediately after query --> find position of correct B and end of query is found + byte[] byteSequence = { 66, 0, 0, 0, 12, 0, 0, 0, 0, 0, 0, 0, 0, 68, 0, 0, 0, 6, 80, 0, 69, 0, 0, 0, 9 }; + String msgWithZeroBits = new String( byteSequence, StandardCharsets.UTF_8 ); + String endSequence = msgWithZeroBits.replace( "\u0000", "" ); + + String endOfQuery = query.substring( incomingMsg.length() - 20 ); + + int idx = incomingMsg.indexOf( endSequence ); + if ( idx != -1 ) { + query = query.substring( 0, idx - 2 ); + } else { + errorHandler.sendSimpleErrorMessage( "Something went wrong while extracting the query from the incoming stream" ); + //TODO(FF): does it continue to send stuff to the client? (even though he doesn't receives it anymore?) + } + + return query; + } + + + /** + * extracts the prepared query from the incoming string (cuts all other information/buffer things) + * + * @param incomingMsg the message from the client + * @return a list of 2 strings. the first element is the prepared query, the second element is empty (if not sent along) or contains the execute query + */ + private String[] extractPreparedQuery( String incomingMsg ) { + String prepareString = extractQuery( incomingMsg ); + String executeString = ""; + + + if ( incomingMsg.contains( "EXECUTE" ) ) { + executeString = extractExecutePart( incomingMsg ); + } + + String[] result = { prepareString, executeString }; + return result; + } + + /** + * Extracts the execute query if it was sent along with the prepare query + * + * @param incomingMsg the incoming message from the client + * @return returns the execute query + */ + private String extractExecutePart( String incomingMsg ) { + + int idx = incomingMsg.indexOf( "EXECUTE" ); + String executeStringWithBufferStuff = incomingMsg.substring( idx ); + String executeString = executeStringWithBufferStuff.split( "\\)" )[0]; + executeString = executeString + ")"; + + return executeString; + } + + /** + * Extracts the name of the prepared query (also of the execute query) + * + * @param cleanedQuery the prepared/execute query without remains from the buffer (zeros in between etc) + * @return the name of the prepare statement as a string + */ + private String extractPreparedQueryName( String cleanedQuery ) { + + String startNamePlusQuery = cleanedQuery.substring( 8 ); + String name = startNamePlusQuery.split( "\\(" )[0]; + + return name.replace( " ", "" ); + } + + + /** + * Creates and sends (flushes on ctx) a readyForQuery message with a tag. The tag is choosable (see below for options). + * + * @param msgBody tag - current transaction status indicator (possible vals: I (idle, not in transaction block), + * T (in transaction block), E (in failed transaction block, queries will be rejected until block is ended) + */ + public void sendReadyForQuery( String msgBody ) { + PGInterfaceMessage readyForQuery = new PGInterfaceMessage( PGInterfaceHeaders.Z, msgBody, 5, false ); + PGInterfaceServerWriter readyForQueryWriter = new PGInterfaceServerWriter( "c", readyForQuery, ctx, this ); + ctx.writeAndFlush( readyForQueryWriter.writeOnByteBuf() ); + } + + + /** + * creates and sends a parseComplete and a bindComplete message to the client. + */ + public void sendParseBindComplete() { + //TODO(FF): This should work with the normal PGInterfaceServerWriter type "i" (called like in the commented out part), + // but it does not --> roundabout solution that works, but try to figure out what went wrong... + + /* + //parseComplete + PGInterfaceMessage parseComplete = new PGInterfaceMessage(PGInterfaceHeaders.ONE, "0", 4, true); + PGInterfaceServerWriter parseCompleteWriter = new PGInterfaceServerWriter("i", parseComplete, ctx); + ctx.writeAndFlush(parseCompleteWriter.writeOnByteBuf()); + + //bindComplete + PGInterfaceMessage bindComplete = new PGInterfaceMessage(PGInterfaceHeaders.TWO, "0", 4, true); + PGInterfaceServerWriter bindCompleteWriter = new PGInterfaceServerWriter("i", bindComplete, ctx); + ctx.writeAndFlush(bindCompleteWriter.writeOnByteBuf()); + */ + + ByteBuf buffer = ctx.alloc().buffer(); + PGInterfaceMessage mockMessage = new PGInterfaceMessage( PGInterfaceHeaders.ONE, "0", 4, true ); + PGInterfaceServerWriter headerWriter = new PGInterfaceServerWriter( "i", mockMessage, ctx, this ); + buffer = headerWriter.writeIntHeaderOnByteBuf( '1' ); + buffer.writeBytes( headerWriter.writeIntHeaderOnByteBuf( '2' ) ); + ctx.writeAndFlush( buffer ); + + } + + + /** + * creates and sends the noData message to the client + */ + public void sendNoData() { + PGInterfaceMessage noData = new PGInterfaceMessage( PGInterfaceHeaders.n, "0", 4, true ); + PGInterfaceServerWriter noDataWriter = new PGInterfaceServerWriter( "i", noData, ctx, this ); + ctx.writeAndFlush( noDataWriter.writeIntHeaderOnByteBuf( 'n' ) ); + } + + + /** + * Sends CommandComplete to client, with choosable command type + * + * @param command which command is completed (no space afterwards, space is added here) + * @param rowsAffected number of rows affected (if it is not necessary to send a number, put -1, (0 is also possible)) + */ + public void sendCommandComplete( String command, int rowsAffected ) { + String body = ""; + PGInterfaceMessage commandComplete; + PGInterfaceServerWriter commandCompleteWriter; + + if ( rowsAffected == -1 ) { + body = command; + + } else { + body = command + " " + rowsAffected; + } + + commandComplete = new PGInterfaceMessage( PGInterfaceHeaders.C, body, 4, true ); + commandCompleteWriter = new PGInterfaceServerWriter( "s", commandComplete, ctx, this ); + ctx.writeAndFlush( commandCompleteWriter.writeOnByteBuf() ); + } + + + /** + * Prepares everything to send rowDescription + * + * @param numberOfFields how many fields are in a row of the result + * @param valuesPerCol The values that should be sent for each field (information about each column) + */ + public void sendRowDescription( int numberOfFields, ArrayList valuesPerCol ) { + String body = String.valueOf( numberOfFields ); + PGInterfaceMessage rowDescription = new PGInterfaceMessage( PGInterfaceHeaders.T, body, 4, true ); //the length here doesn't really matter, because it is calculated seperately in writeRowDescription + PGInterfaceServerWriter rowDescriptionWriter = new PGInterfaceServerWriter( "i", rowDescription, ctx, this ); + ctx.writeAndFlush( rowDescriptionWriter.writeRowDescription( valuesPerCol ) ); + } + + + /** + * Prepares everything to send DataRows, with its corresponding needed information + * + * @param data data that should be sent + */ + public void sendDataRow( ArrayList data ) { + int noCols = data.size(); //number of rows returned TODO(FF,low priority): rename every occurence + String colVal = ""; //The value of the result + int colValLength = 0; //length of the colVal - can be 0 and -1 (-1= NULL is colVal) + String body = ""; //combination of colVal and colValLength + int nbrFollowingColVal = data.get( 0 ).length; + + PGInterfaceMessage dataRow; + PGInterfaceServerWriter dataRowWriter; + + for ( int i = 0; i < noCols; i++ ) { + + for ( int j = 0; j < nbrFollowingColVal; j++ ) { + + colVal = data.get( i )[j]; + + //TODO(FF): How is null safed in polypheny exactly?? is it correctly checked? + if ( colVal == "NULL" ) { + colValLength = -1; + //no body should be sent + break; + } else { + colValLength += colVal.length(); + body += colVal.length() + PGInterfaceMessage.getDelimiter() + colVal + PGInterfaceMessage.getDelimiter(); + } + } + dataRow = new PGInterfaceMessage( PGInterfaceHeaders.D, body, colValLength, false ); + dataRowWriter = new PGInterfaceServerWriter( "dr", dataRow, ctx, this ); + ctx.writeAndFlush( dataRowWriter.writeOnByteBuf() ); + body = ""; + } + } + + + /** + * Closes the ctx, so nothing more can be sent to the client + */ + public void terminateConnection() { + ctx.close(); + } +} + + diff --git a/plugins/postgresql-interface/src/main/java/org/polypheny/db/postgresql/PGInterfaceMessage.java b/plugins/postgresql-interface/src/main/java/org/polypheny/db/postgresql/PGInterfaceMessage.java new file mode 100644 index 0000000000..1e5c2cb027 --- /dev/null +++ b/plugins/postgresql-interface/src/main/java/org/polypheny/db/postgresql/PGInterfaceMessage.java @@ -0,0 +1,184 @@ +/* + * Copyright 2019-2023 The Polypheny Project + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.polypheny.db.postgresql; + +import lombok.extern.slf4j.Slf4j; + + +/** + * Contains information for what will be sent to the client, and also methods to handle the information + */ +@Slf4j +public class PGInterfaceMessage { + + private static final char delimiter = '§'; //for the subparts + private final boolean defaultLength; + private PGInterfaceHeaders header; + private String msgBody; + private int length; //default is 4, if a different length is mentioned in protocol, this is given + private PGInterfaceErrorHandler errorHandler; + + + /** + * Creates a PG-Message. It contains all relevant information to send a message to the client + * + * @param header What header should be sent (depends on message-type) + * @param msgBody The message itself + * @param length The length of the message (the length itself is included) + * @param defaultLength The length of the length sent (which is included). Length is 4 + */ + public PGInterfaceMessage( PGInterfaceHeaders header, String msgBody, int length, boolean defaultLength ) { + this.header = header; + this.msgBody = msgBody; + this.length = length; + this.defaultLength = defaultLength; + } + + /** + * Get what delimiter is currently set + * + * @return the current delimiter as string + */ + public static String getDelimiter() { + String del = String.valueOf( delimiter ); + return del; + } + + /** + * get the header of the message + * + * @return PGInterfaceHeader of the message + */ + public PGInterfaceHeaders getHeader() { + return this.header; + } + + public void setHeader( PGInterfaceHeaders header ) { + this.header = header; + } + + /** + * returns the PGInterfaceHeader of the Message as a char + * + * @return PGInterfaceHeader as a char + */ + public char getHeaderChar() { + + //if header is a single character + if ( header != PGInterfaceHeaders.ONE && header != PGInterfaceHeaders.TWO && header != PGInterfaceHeaders.THREE ) { + String headerString = header.toString(); + return headerString.charAt( 0 ); + } + //if header is a number + //TODO(FF): make a nicer version of this... if you cast headerInt to char directly it returns '\u0001' and not '1' + else { + int headerInt = getHeaderInt(); + if ( headerInt == 1 ) { + return '1'; + } else if ( headerInt == 2 ) { + return '2'; + } else if ( headerInt == 3 ) { + return '3'; + } + } + //TODO(FF): does it continue to send things to the client after the error message? + errorHandler.sendSimpleErrorMessage( "PGInterface>PGInterfaceMessage>getHeaderChar: This should never be reached." ); + + return 0; + } + + /** + * Changes the three headers that are a number and not a letter into a number (they are safed as a string in the PGInterfaceHeaders) + * + * @return 1, 2 or 3 - headers which are numbers + */ + public int getHeaderInt() { + String headerString = header.toString(); + if ( headerString.equals( "ONE" ) ) { + return 1; + } else if ( headerString.equals( "TWO" ) ) { + return 2; + } else if ( headerString.equals( "THREE" ) ) { + return 3; + } + //TODO(FF): does it continue to send things to the client after the error message? + errorHandler.sendSimpleErrorMessage( "PGInterface>PGInterfaceMessage>getHeaderInt: This should never be reached." ); + return 0; + } + + /** + * the length that should be set in the message to the client (default is 4) + * + * @return length of the message + */ + public int getLength() { + return this.length; + } + + /** + * set the length of the message to the client (the default is set to 4) + * + * @param length length you want to set as the message length + */ + public void setLength( int length ) { + this.length = length; + } + + /** + * if the message has the default length (4) + * + * @return whether the message has the default length + */ + public boolean isDefaultLength() { + return this.defaultLength; + } + + /** + * The content of the message that will be sent to the client, can contain several "sub" messages (message fields) which are seperated by the delimiter + * + * @return message to the client + */ + public String getMsgBody() { + return msgBody; + } + + /** + * Set the content of the message that will be sent to the client, can contain several "sub" messages (message fields) which are seperated by the delimiter + * + * @param msgBody message to the client + */ + public void setMsgBody( String msgBody ) { + this.msgBody = msgBody; + } + + /** + * Gets the different sub-parts of a message, which are seperated by the delimiter + * + * @param part the index of the requested part(s), starting at 0 + * @return a string array with each requested part + */ + public String[] getMsgPart( int[] part ) { + String[] subStrings = msgBody.split( getDelimiter() ); + String[] result = new String[part.length]; + + for ( int i = 0; i < (part.length); i++ ) { + result[i] = subStrings[i]; + } + return result; + } + +} diff --git a/plugins/postgresql-interface/src/main/java/org/polypheny/db/postgresql/PGInterfacePreparedMessage.java b/plugins/postgresql-interface/src/main/java/org/polypheny/db/postgresql/PGInterfacePreparedMessage.java new file mode 100644 index 0000000000..e0d1913d05 --- /dev/null +++ b/plugins/postgresql-interface/src/main/java/org/polypheny/db/postgresql/PGInterfacePreparedMessage.java @@ -0,0 +1,319 @@ +/* + * Copyright 2019-2023 The Polypheny Project + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.polypheny.db.postgresql; + +import io.netty.channel.ChannelHandlerContext; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.ListIterator; +import java.util.Map; +import lombok.extern.slf4j.Slf4j; +import org.polypheny.db.algebra.type.AlgDataType; +import org.polypheny.db.transaction.Statement; +import org.polypheny.db.type.PolyType; + +/** + * Contains information for prepared queries, and also methods to handle the information + */ +@Slf4j +public class PGInterfacePreparedMessage { + private static final String executeDelimiter = ", "; + private final String name; + private final ChannelHandlerContext ctx; + private final Map typesPolyphey = new HashMap(); + private final List> valuesPolypeny = new ArrayList>(); + private String query; + private String wholePrepareString; + private String executeString; + private List dataTypes; + private List data; + private PGInterfaceErrorHandler errorHandler; + + + /** + * Creates the message itself + * + * @param name name of the prepared statement + * @param wholePrepareString the string which contains the prepared statement + * @param ctx channelHandlerContext from the current connection + */ + public PGInterfacePreparedMessage( String name, String wholePrepareString, ChannelHandlerContext ctx ) { + this.name = name; + this.wholePrepareString = wholePrepareString; + this.ctx = ctx; + } + + /** + * Creates the message itself + * + * @param name name of the prepared statement + * @param ctx channelHandlerContext from the current connection + */ + public PGInterfacePreparedMessage( String name, ChannelHandlerContext ctx ) { + this.name = name; + this.ctx = ctx; + } + + /** + * The delimiter that seperates the values in the execute query + * + * @return a string sequence which is the delimiter in the execute query + */ + public static String getExecuteDelimiter() { + return executeDelimiter; + } + + /** + * the "whole" execute query + * + * @param executeString execute query + */ + public void setExecuteString( String executeString ) { + this.executeString = executeString; + } + + /** + * the "whole" prepare query + * + * @param wholePrepareString prepare query + */ + public void setWholePrepareString( String wholePrepareString ) { + this.wholePrepareString = wholePrepareString; + } + + /** + * The "pure" query (without prepare etc.) + * + * @return the query itself (without prepare etc.) + */ + public String getQuery() { + return query; + } + + /** + * Sets the query itself (without prepare etc) + * + * @param query The "pure" query + */ + public void setQuery( String query ) { + this.query = query; + } + + public List getDataTypes() { + return dataTypes; + } + + /** + * the data types from the prepare query + * + * @param dataTypes a list if all types (in the right order) from the prepare query + */ + private void setDataTypes( List dataTypes ) { + this.dataTypes = dataTypes; + } + + /** + * Gets the values from the execute query + * + * @return values from the execute query as string + */ + public List getData() { + return data; + } + + /** + * The values that will be inserted into the prepare query + * + * @param data the values that will be inserted into the prepare query as string (since they arrive as string from the connection) + */ + private void setData( List data ) { + this.data = data; + } + + /** + * From the execute string it extracts the values that will be inserted into the prepare query, sets these values in the message + */ + public void extractAndSetValues() { + String onlyExecuteValues = executeString.split( "\\(|\\)" )[1]; + List valueList = Arrays.asList( onlyExecuteValues.split( getExecuteDelimiter() ) ); + setData( valueList ); + } + + /** + * From the prepare string it extracts the data types for the values to be inserted, sets these in the message + */ + public void extractAndSetTypes() { + String types = wholePrepareString.split( "\\(|\\)" )[1]; + List typeList = Arrays.asList( types.split( getExecuteDelimiter() ) ); + + //replace all bool with boolean to match polypheny dt + if ( typeList.contains( "bool" ) || typeList.contains( "BOOL" ) ) { + ListIterator iterator = typeList.listIterator(); + while ( iterator.hasNext() ) { + String next = iterator.next(); + if ( next.equals( "bool" ) ) { + typeList.set( iterator.nextIndex() - 1, "BOOLEAN" ); + } + } + } + setDataTypes( typeList ); + } + + /** + * in the prepared query it changes the parameter symbol from $ (from postgres) to ? (for polypheny), sets it in this message + */ + public void changeParameterSymbol() { + + String[] parts = wholePrepareString.split( "\\$" ); + String newPrepareString = ""; + + for ( int i = 1; i < parts.length; i++ ) { + newPrepareString = newPrepareString + "?" + parts[i].substring( 1 ); + } + + setWholePrepareString( parts[0] + newPrepareString ); + + } + + /** + * extracts the "pure" query from the prepared string (so the query itself without anything else), sets it in this message + */ + public void extractAndSetQuery() { + String query = wholePrepareString.split( "AS " )[1]; + setQuery( query ); + } + + /** + * executes steps to prepare a query to be processed by polypheny + * steps include: extract the types from the prepared query, changes the parameter symbol from $ to ?, extracts the query itself + */ + public void prepareQuery() { + changeParameterSymbol(); + extractAndSetTypes(); + extractAndSetQuery(); + } + + /** + * Sets the parameter values (the datatype and the value) to the data context, so polypheny can process the prepared query + * + * @param statement the statement that was created from the query + */ + public void transformDataAndAddParameterValues( Statement statement ) { + long idx = 0; + + //if o is as long as the number of data types + for ( String type : dataTypes ) { + List o = new ArrayList<>(); + for ( int i = 0; i < data.size(); i++ ) { + if ( i % dataTypes.size() == ( int ) idx ) { + String value = data.get( i ); + o.add( transformData( value, type ) ); + } + } + AlgDataType algDataType = transformToAlgDataType( type, statement ); + statement.getDataContext().addParameterValues( idx, algDataType, o ); + idx++; + } + + /* + // if o is as long as all the number of data + for (int i = 0; i o = new ArrayList<>(); + String value = data.get((int) idx); + String type = dataTypes.get(i%dataTypes.size()); + o.add(transformData(value, type)); + AlgDataType algDataType = transformToAlgDataType(type, statement); + statement.getDataContext().addParameterValues(idx, algDataType, o); + idx ++; + } + + */ + + } + + /** + * Transforms the data from a string into the correct format + * + * @param value the value that needs to be transformed + * @param type the type the value needs to be transformed into + * @return returns the transformed value as an object + */ + private Object transformData( String value, String type ) { + Object o = new Object(); + switch ( type ) { + //TODO(FF): implement more data types + case "int": + o = Integer.valueOf( value ); + break; + case "text": + String pureValue = value; + if ( value.charAt( 0 ) == '\'' ) { + pureValue = value.substring( 1 ); + if ( value.charAt( value.length() - 1 ) == '\'' ) { + pureValue = pureValue.substring( 0, pureValue.length() - 1 ); + } + } + o = pureValue; + break; + case "bool": + o = Boolean.parseBoolean( value ); + break; + case "numeric": + o = Double.parseDouble( value ); + break; + default: + errorHandler.sendSimpleErrorMessage( "data type from from the prepared query is not yet supported by the postgres interface (transform data)" ); + break; + } + return o; + } + + /** + * creates a AlgDataType according to the corresponding data type + * + * @param type the type you want a AlgDataType for (types are the pgTypes) + * @param statement needed to create the AlgDataType + * @return returns the corresponding AlgDataType to the input type + */ + private AlgDataType transformToAlgDataType( String type, Statement statement ) { + AlgDataType result = null; + switch ( type ) { + //TODO(FF): implement more data types + case "int": + result = statement.getTransaction().getTypeFactory().createPolyType( PolyType.INTEGER ); + break; + case "text": + result = statement.getTransaction().getTypeFactory().createPolyType( PolyType.VARCHAR, 255 ); //TODO(FF): how do I know the precision? + break; + case "bool": + result = statement.getTransaction().getTypeFactory().createPolyType( PolyType.BOOLEAN ); + break; + case "numeric": + result = statement.getTransaction().getTypeFactory().createPolyType( PolyType.DECIMAL, 3, 3 ); + break; + default: + errorHandler.sendSimpleErrorMessage( "data type from from the prepared query is not yet supported by the postgres interface (create AlgDataType)" ); + break; + } + + return result; + } + + +} diff --git a/plugins/postgresql-interface/src/main/java/org/polypheny/db/postgresql/PGInterfaceQueryHandler.java b/plugins/postgresql-interface/src/main/java/org/polypheny/db/postgresql/PGInterfaceQueryHandler.java new file mode 100644 index 0000000000..96db8f24f1 --- /dev/null +++ b/plugins/postgresql-interface/src/main/java/org/polypheny/db/postgresql/PGInterfaceQueryHandler.java @@ -0,0 +1,384 @@ +/* + * Copyright 2019-2023 The Polypheny Project + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.polypheny.db.postgresql; + +import io.netty.channel.ChannelHandlerContext; +import java.sql.ResultSetMetaData; +import java.util.ArrayList; +import java.util.List; +import lombok.extern.slf4j.Slf4j; +import org.polypheny.db.PolyImplementation; +import org.polypheny.db.algebra.AlgRoot; +import org.polypheny.db.algebra.constant.Kind; +import org.polypheny.db.algebra.type.AlgDataTypeField; +import org.polypheny.db.catalog.Catalog; +import org.polypheny.db.catalog.exceptions.GenericCatalogException; +import org.polypheny.db.catalog.exceptions.UnknownDatabaseException; +import org.polypheny.db.catalog.exceptions.UnknownSchemaException; +import org.polypheny.db.catalog.exceptions.UnknownUserException; +import org.polypheny.db.config.RuntimeConfig; +import org.polypheny.db.languages.QueryLanguage; +import org.polypheny.db.languages.QueryParameters; +import org.polypheny.db.nodes.Node; +import org.polypheny.db.processing.Processor; +import org.polypheny.db.processing.QueryProcessor; +import org.polypheny.db.transaction.Statement; +import org.polypheny.db.transaction.Transaction; +import org.polypheny.db.transaction.TransactionException; +import org.polypheny.db.transaction.TransactionManager; + + +/** + * Handles all queries from the extended query cycle - "sends" them to polypheny and processes answer + */ +@Slf4j +public class PGInterfaceQueryHandler { + + private String query; + private PGInterfacePreparedMessage preparedMessage; + private Boolean preparedQueryCycle = false; + private final ChannelHandlerContext ctx; + private final PGInterfaceInboundCommunicationHandler communicationHandler; + private final TransactionManager transactionManager; + private int rowsAffected = 0; //rows affected (changed/deleted/inserted/etc) + private List> rows; + private final PGInterfaceErrorHandler errorHandler; + + + public PGInterfaceQueryHandler( String query, ChannelHandlerContext ctx, PGInterfaceInboundCommunicationHandler communicationHandler, TransactionManager transactionManager ) { + this.query = query; + this.ctx = ctx; + this.communicationHandler = communicationHandler; + this.transactionManager = transactionManager; + this.errorHandler = new PGInterfaceErrorHandler( ctx, communicationHandler ); + } + + public PGInterfaceQueryHandler( PGInterfacePreparedMessage preparedMessage, ChannelHandlerContext ctx, PGInterfaceInboundCommunicationHandler communicationHandler, TransactionManager transactionManager ) { + this.preparedMessage = preparedMessage; + this.ctx = ctx; + this.communicationHandler = communicationHandler; + this.transactionManager = transactionManager; + preparedQueryCycle = true; + this.errorHandler = new PGInterfaceErrorHandler( ctx, communicationHandler ); + } + + /** + * Depending on how the PGInterfaceQueryHandler was created, it sets the query and starts the process of "sending" the query to polypheny + */ + public void start() { + if ( preparedQueryCycle ) { + this.query = preparedMessage.getQuery(); + } + sendQueryToPolypheny(); + + } + + + /** + * Forwards the message to Polypheny, and get result from it + */ + public void sendQueryToPolypheny() { + String type = ""; //query type according to answer tags + String commitStatus; + Transaction transaction; + Statement statement = null; + PolyImplementation result; + ArrayList data = new ArrayList<>(); + ArrayList header = new ArrayList<>(); + + try { + //get transaction and statement + transaction = transactionManager.startTransaction( Catalog.defaultUserId, Catalog.defaultDatabaseId, false, "Index Manager" ); + statement = transaction.createStatement(); + } catch ( UnknownDatabaseException | GenericCatalogException | UnknownUserException | UnknownSchemaException e ) { + //TODO(FF): will it continue to send things to the client? + errorHandler.sendSimpleErrorMessage( "Error while starting transaction" + e ); + throw new RuntimeException( "Error while starting transaction", e ); + } + + try { + if ( preparedQueryCycle ) { + preparedMessage.transformDataAndAddParameterValues( statement ); + } + + //get algRoot + Processor sqlProcessor = statement.getTransaction().getProcessor( QueryLanguage.from( "sql" ) ); + Node sqlNode = sqlProcessor.parse( query ).get( 0 ); + QueryParameters parameters = new QueryParameters( query, Catalog.NamespaceType.RELATIONAL ); + + if ( sqlNode.isA( Kind.DDL ) ) { + result = sqlProcessor.prepareDdl( statement, sqlNode, parameters ); + type = sqlNode.getKind().name(); + sendResultToClient( type, data, header ); + + } else { + AlgRoot algRoot = sqlProcessor.translate( + statement, + sqlProcessor.validate( statement.getTransaction(), sqlNode, RuntimeConfig.ADD_DEFAULT_VALUES_IN_INSERTS.getBoolean() ).left, + new QueryParameters( query, Catalog.NamespaceType.RELATIONAL ) ); + + //get PolyResult from AlgRoot + final QueryProcessor processor = statement.getQueryProcessor(); + result = processor.prepareQuery( algRoot, true ); + + //get type information + header = getHeader( result ); + + //get actual result of query in array + rows = result.getRows( statement, -1 ); + data = computeResultData( rows, header ); + + //type = result.getStatementType().toString(); + type = result.getKind().name(); + + transaction.commit(); + commitStatus = "Committed"; + + sendResultToClient( type, data, header ); + } + + } catch ( Throwable t ) { + List lol = null; + + //TODO(FF): will continue to send things to client after this? + String errorMsg = t.getMessage(); + errorHandler.sendSimpleErrorMessage( errorMsg ); + try { + transaction.rollback(); + commitStatus = "Rolled back"; + } catch ( TransactionException ex ) { + errorHandler.sendSimpleErrorMessage( "Error while rolling back" ); + commitStatus = "Error while rolling back"; + } + } + + } + + /** + * Gets the information for the header - Information for each column + * + * @param result the PolyImplementation the additional information is needed for + * @return a list with array, where: + * - array[0] = columnName + * - array[1] = columnType + * - array[2] = precision + */ + private ArrayList getHeader( PolyImplementation result ) { + ArrayList header = new ArrayList<>(); + for ( AlgDataTypeField metaData : result.getRowType().getFieldList() ) { + String columnName = metaData.getName(); + String dataType = metaData.getType().getPolyType().getTypeName(); //INTEGER, VARCHAR + int precision = metaData.getType().getPrecision(); //sizeVarChar, decimal places double + boolean nullable = metaData.getType().isNullable() == (ResultSetMetaData.columnNullable == 1); + + //For each column: If it should be filtered empty string if it should not be filtered + /* + String filter = ""; + if ( request.filter != null && request.filter.containsKey( columnName ) ) { + filter = request.filter.get( columnName ); + } + */ + + header.add( new String[]{ columnName, dataType, String.valueOf( precision ) } ); + } + return header; + } + + + /** + * Transforms the data into Strings. Possible to expand and change it into other datatypes + * + * @param rows The result-data as object-type + * @param header Header-data - additional information about the data (rows) + * @return the rows transformed accordingly (right now turned into a string) + */ + private ArrayList computeResultData( List> rows, ArrayList header ) { + //TODO(FF): Implement more Datatypes + ArrayList data = new ArrayList<>(); + + for ( List row : rows ) { + String[] temp = new String[row.size()]; + int counter = 0; + for ( Object o : row ) { + if ( o == null ) { + temp[counter] = null; + } else { + switch ( header.get( counter )[0] ) { + case "TIMESTAMP": + break; + case "DATE": + break; + case "TIME": + break; + case "FILE": + case "IMAGE": + case "SOUND": + case "VIDEO": + break; + //fall through + default: + temp[counter] = o.toString(); + } + if ( header.get( counter )[0].endsWith( "ARRAY" ) ) { + + } + } + counter++; + } + data.add( temp ); + } + + return data; + } + + + /** + * Prepares according to the query from the client what (and how) should be sent as a response + * + * @param type Type of the query (e.g.: Select, Insert, Create Table, etc.) + * @param data The data that needs to be sent to the client + * @param header Additional information for the data + */ + public void sendResultToClient( String type, ArrayList data, ArrayList header ) { + //TODO(FF): handle more responses to client + switch ( type ) { + case "INSERT": + case "DROP_TABLE": + case "TRUNCATE": + case "UPDATE": + communicationHandler.sendParseBindComplete(); + communicationHandler.sendCommandComplete( type, rowsAffected ); + communicationHandler.sendReadyForQuery( "I" ); + + break; + + case "CREATE_TABLE": + //1....2....n....C....CREATE TABLE.Z....I + communicationHandler.sendParseBindComplete(); + communicationHandler.sendCommandComplete( type, -1 ); + communicationHandler.sendReadyForQuery( "I" ); + + break; + + case "SELECT": + ArrayList valuesPerCol = new ArrayList(); + + //More info about these variables in javadoc for PGInterfaceServerWriter > writeRowDescription + String fieldName = ""; //string - column name (field name) (matters) + int objectIDTable = 0; //int32 - ObjectID of table (if col can be id'd to table) --> otherwise 0 (doesn't matter to client while sending) + int attributeNoCol = 0; //int16 - attr.no of col (if col can be id'd to table) --> otherwise 0 (doesn't matter to client while sending) + int objectIDColDataType = 0; //int32 - objectID of parameter datatype --> 0 = unspecified (doesn't matter to client while sending, but maybe later) - see comment below + int dataTypeSize = 0; //int16 - size of dataType (if formatCode = 1, this needs to be set for colValLength) (doesn't matter to client while sending) + int typeModifier = -1; //int32 - The value will generally be -1 (doesn't matter to client while sending) + int formatCode = 0; //int16 - 0: Text | 1: Binary --> sends everything with writeBytes(formatCode = 0), if sent with writeInt it needs to be 1 (matters) + + /* + There is no list for the OID's of the data types in the postgres documentation. + This list is a hardcoded list from the JDBC driver which contains all values. + One element in the list is a list of these elements: {pgName, OID, sqlType, javaClass, ?} + + private static final Object[][] types = new Object[][]{{"int2", 21, 5, "java.lang.Integer", 1005}, {"int4", 23, 4, "java.lang.Integer", 1007}, {"oid", 26, -5, "java.lang.Long", 1028}, {"int8", 20, -5, "java.lang.Long", 1016}, {"money", 790, 8, "java.lang.Double", 791}, {"numeric", 1700, 2, "java.math.BigDecimal", 1231}, {"float4", 700, 7, "java.lang.Float", 1021}, {"float8", 701, 8, "java.lang.Double", 1022}, {"char", 18, 1, "java.lang.String", 1002}, {"bpchar", 1042, 1, "java.lang.String", 1014}, {"varchar", 1043, 12, "java.lang.String", 1015}, {"text", 25, 12, "java.lang.String", 1009}, {"name", 19, 12, "java.lang.String", 1003}, {"bytea", 17, -2, "[B", 1001}, {"bool", 16, -7, "java.lang.Boolean", 1000}, {"bit", 1560, -7, "java.lang.Boolean", 1561}, {"date", 1082, 91, "java.sql.Date", 1182}, {"time", 1083, 92, "java.sql.Time", 1183}, {"timetz", 1266, 92, "java.sql.Time", 1270}, {"timestamp", 1114, 93, "java.sql.Timestamp", 1115}, {"timestamptz", 1184, 93, "java.sql.Timestamp", 1185}, {"refcursor", 1790, 2012, "java.sql.ResultSet", 2201}, {"json", 114, 1111, "org.postgresql.util.PGobject", 199}, {"point", 600, 1111, "org.postgresql.geometric.PGpoint", 1017}}; + + */ + + //data + int numberOfFields = header.size(); //int16 - number of fields (cols) (matters) + + for ( String[] head : header ) { + + fieldName = head[0]; + + //TODO(FF): Implement the rest of the cases - only Integer and varchar tested --> warn client? + switch ( head[1] ) { + case "BIGINT": + case "DOUBLE": + //TODO(FF): head[2] is the number of decimal places, is set to 3 in standard postgres ("dismissed in beginning, not checked what it actually is") + dataTypeSize = 8; //8 bytes signed + formatCode = 0; + objectIDColDataType = 20; + break; + case "BOOLEAN": + dataTypeSize = 1; //TODO(FF): how exactly is bool sent? acc. to doc. size is 1 bit? + objectIDColDataType = 16; + break; + case "DECIMAL": + break; + case "REAL": + case "INTEGER": + objectIDColDataType = 23; + dataTypeSize = 4; + formatCode = 0; + break; + case "VARCHAR": + objectIDColDataType = 1043; + typeModifier = Integer.parseInt( head[2] ); + dataTypeSize = Integer.parseInt( head[2] ); //TODO(FF): I just send the length of the varchar here, because the client doesn't complain. + formatCode = 0; + break; + case "SMALLINT": + objectIDColDataType = 21; + dataTypeSize = 2; + formatCode = 0; + break; + case "TINYINT": + objectIDColDataType = 21; //is the same oID as for int2 + dataTypeSize = 1; + formatCode = 0; + break; + case "DATE": //I did not find a list online for all OID's --> more info in comment on init. of oid's + case "TIMESTAMP": + case "TIME": + case "FILE": + case "IMAGE": + case "SOUND": + case "VIDEO": + default: + errorHandler.sendSimpleErrorMessage( "The DataType of the answer is not yet implemented, but there is a high chance that the query was executed in Polypheny" ); + break; + } + Object[] col = { fieldName, objectIDTable, attributeNoCol, objectIDColDataType, dataTypeSize, typeModifier, formatCode }; + valuesPerCol.add( col ); + } + communicationHandler.sendParseBindComplete(); + communicationHandler.sendRowDescription( numberOfFields, valuesPerCol ); + communicationHandler.sendDataRow( data ); + + rowsAffected = data.size(); + communicationHandler.sendCommandComplete( type, rowsAffected ); + communicationHandler.sendReadyForQuery( "I" ); + + break; + + case "DELETE": + //DELETE rows (rows = #rows deleted) + + case "MOVE": + //MOVE rows (rows = #rows the cursor's position has been changed by (??)) + + case "FETCH": + //FETCH rows (rows = #rows that have been retrieved from cursor) + + case "COPY": + //COPY rows (rows = #rows copied --> only on PSQL 8.2 and later)$ + + default: + errorHandler.sendSimpleErrorMessage( "Answer to client is not yet supported, but there is a high chance that the query was executed in Polypheny" ); + break; + + } + } +} diff --git a/plugins/postgresql-interface/src/main/java/org/polypheny/db/postgresql/PGInterfaceServerHandler.java b/plugins/postgresql-interface/src/main/java/org/polypheny/db/postgresql/PGInterfaceServerHandler.java new file mode 100644 index 0000000000..b4f46ace58 --- /dev/null +++ b/plugins/postgresql-interface/src/main/java/org/polypheny/db/postgresql/PGInterfaceServerHandler.java @@ -0,0 +1,99 @@ +/* + * Copyright 2019-2023 The Polypheny Project + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.polypheny.db.postgresql; + +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInboundHandlerAdapter; +import java.util.ArrayList; +import lombok.extern.slf4j.Slf4j; +import org.polypheny.db.transaction.TransactionManager; + +/** + * Forwards the message from the "netty flow" to the internal structure + */ +@Slf4j +public class PGInterfaceServerHandler extends ChannelInboundHandlerAdapter { + + TransactionManager transactionManager; + PGInterfaceErrorHandler errorHandler; + ArrayList preparedStatementNames = new ArrayList<>(); + ArrayList preparedMessages = new ArrayList<>(); + + + public PGInterfaceServerHandler( TransactionManager transactionManager ) { + this.transactionManager = transactionManager; + } + + + /** + * What the handler acutally does - it calls the logic to handle the incoming message + * + * @param ctx unique for connection + * @param msg incoming message decoded (to string) from decoder + */ + @Override + public void channelRead( ChannelHandlerContext ctx, Object msg ) { + PGInterfaceInboundCommunicationHandler interfaceInboundCommunicationHandler = new PGInterfaceInboundCommunicationHandler( "", ctx, transactionManager ); + interfaceInboundCommunicationHandler.decideCycle( msg, this ); + } + + + @Override + public void exceptionCaught( ChannelHandlerContext ctx, Throwable cause ) { + //cause.printStackTrace(); + ctx.close(); + } + + /** + * adds a name of a prepared statement to the list of names (each name should be unique) + * + * @param name of the prepared statemnt + */ + public void addPreparedStatementNames( String name ) { + preparedStatementNames.add( name ); + } + + /** + * adds a prepared message (contains info about the prepared message) to the list of prepared messages. The prepared messages are in the same order as the names in the list of names + * + * @param preparedMessage you want to add to the list + */ + public void addPreparedMessage( PGInterfacePreparedMessage preparedMessage ) { + preparedMessages.add( preparedMessage ); + } + + /** + * returns the list of all names from the prepared statements (each name should be unique) + * + * @return + */ + public ArrayList getPreparedStatementNames() { + return preparedStatementNames; + } + + /** + * gets a prepared message from the list of prepared messages. The prepared messages are in the same order as the names in the list of names + * + * @param idx the index of the message you want to return + * @return the message from the list at index idx + */ + public PGInterfacePreparedMessage getPreparedMessage( int idx ) { + return preparedMessages.get( idx ); + } + +} + diff --git a/plugins/postgresql-interface/src/main/java/org/polypheny/db/postgresql/PGInterfaceServerWriter.java b/plugins/postgresql-interface/src/main/java/org/polypheny/db/postgresql/PGInterfaceServerWriter.java new file mode 100644 index 0000000000..1fd5ec082a --- /dev/null +++ b/plugins/postgresql-interface/src/main/java/org/polypheny/db/postgresql/PGInterfaceServerWriter.java @@ -0,0 +1,295 @@ +/* + * Copyright 2019-2023 The Polypheny Project + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.polypheny.db.postgresql; + +import io.netty.buffer.ByteBuf; +import io.netty.channel.ChannelHandlerContext; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.LinkedHashMap; +import lombok.extern.slf4j.Slf4j; + +/** + * Writes the messages that need to be sent to the client byte-wise on the buffer + */ +@Slf4j +public class PGInterfaceServerWriter { + + String type; + PGInterfaceMessage pgMsg; + ChannelHandlerContext ctx; + PGInterfaceErrorHandler errorHandler; + + + /** + * creates a server writer, writes response to client on byteBuf + * + * @param type what type of message should be written (in method writeOnByteBuf) + * possible types are: + * - s: write 1 string + * - c: write a char (or number) - writeByte + * - i: writes an int32 + * - ss: writes a message with two strings (the strings are safed as one in the msgBody of the pgMsg and are seperated by the delimiter) + * - sss: same as above, but with three strings + * - dr: write dataRow - writes the message dataRow to the client + * @param pgMsg The message object that contains all necessary information to send it to the client + * @param ctx channelHandlerContext specific to the connection + * @param pgInterfaceInboundCommunicationHandler + */ + public PGInterfaceServerWriter( String type, PGInterfaceMessage pgMsg, ChannelHandlerContext ctx, PGInterfaceInboundCommunicationHandler pgInterfaceInboundCommunicationHandler ) { + //TODO(FF): remove type from initialization and pass it through writeOnByteBuf (would be tidier - but works without problem the way it is) + this.type = type; + this.pgMsg = pgMsg; + this.ctx = ctx; + this.errorHandler = new PGInterfaceErrorHandler( ctx, pgInterfaceInboundCommunicationHandler ); + } + + + /** + * Handles different cases of writing things on the buffer (e.g. strings, int, etc. (see in PGInterfaceServerWriter constructor)) + * + * @return The buffer with the message written on it + */ + public ByteBuf writeOnByteBuf() { + ByteBuf buffer = ctx.alloc().buffer(); + switch ( type ) { + + //write string + case "s": + buffer.writeByte( pgMsg.getHeaderChar() ); + if ( pgMsg.isDefaultLength() ) { + buffer.writeInt( pgMsg.getLength() + pgMsg.getMsgBody().length() ); + } else { + buffer.writeInt( pgMsg.getLength() ); + } + buffer.writeBytes( pgMsg.getMsgBody().getBytes( StandardCharsets.US_ASCII ) ); + break; + + //write byte (char) + case "c": + buffer.writeByte( pgMsg.getHeaderChar() ); + if ( pgMsg.isDefaultLength() ) { + buffer.writeInt( pgMsg.getLength() + 1 ); + } else { + buffer.writeInt( pgMsg.getLength() ); + } + char msgBody = pgMsg.getMsgBody().charAt( 0 ); + buffer.writeByte( msgBody ); + break; + + //write int + case "i": + buffer.writeByte( pgMsg.getHeaderChar() ); + if ( pgMsg.isDefaultLength() ) { + buffer.writeInt( pgMsg.getLength() ); + } else { + buffer.writeInt( pgMsg.getLength() ); + } + int body = 0; + try { + body = Integer.parseInt( pgMsg.getMsgBody() ); + } catch ( NumberFormatException e ) { + errorHandler.sendSimpleErrorMessage( "couldn't transform int to string in PGInterfaceServerWriter" + e.getMessage() ); + } + buffer.writeInt( body ); + break; + + //write two strings (tag and message) + case "ss": + buffer = writeSeveralStrings( 2 ); //TODO(FF): maybe find better solution for strings + break; + + //write 3 strings, example, tag with three components + case "sss": + buffer = writeSeveralStrings( 3 ); + break; + + case "ssm": + //several strings modified --> ideally only use this in the future... + break; + + //send dataRow + case "dr": + buffer.writeByte( pgMsg.getHeaderChar() ); + int nbrCol = (pgMsg.getMsgBody().length() - pgMsg.getMsgBody().replaceAll( "§", "" ).length()) / 2; + + //should generally be not the default length, but also works with default length & length = 4 + if ( pgMsg.isDefaultLength() ) { + //data row does not include msg-length bytes in msg length + buffer.writeInt( pgMsg.getLength() - (nbrCol * 2) ); + } else { + //bcs it is including self + buffer.writeInt( pgMsg.getLength() + 4 ); + } + + buffer.writeShort( nbrCol ); + + //cut the last § (it is at the end) from the msgBody and set it as the new msgBody + String temp = pgMsg.getMsgBody().substring( 0, pgMsg.getMsgBody().length() - 1 ); + pgMsg.setMsgBody( temp ); + + int[] idx = new int[(nbrCol * 2)]; + String[] msgParts = pgMsg.getMsgPart( idx ); + + for ( int i = 0; i < ((nbrCol * 2) - 1); i++ ) { + + buffer.writeInt( Integer.parseInt( msgParts[i] ) ); + buffer.writeBytes( msgParts[i + 1].getBytes( StandardCharsets.UTF_8 ) ); + i++; + + } + break; + } + return buffer; + } + + + /** + * If there are several Strings that need to be written on the buffer (for example message and tag(s)) - The Strings are in the msgBody, seperated by the delimiter + * + * @param nbrStrings How many elements are in the msgBody (seperated by the delimiter) + * @return The buffer with the message written on it + */ + public ByteBuf writeSeveralStrings( int nbrStrings ) { + ByteBuf buffer = ctx.alloc().buffer(); + + buffer.writeByte( pgMsg.getHeaderChar() ); + if ( pgMsg.isDefaultLength() ) { + buffer.writeInt( pgMsg.getLength() + pgMsg.getMsgBody().length() - (nbrStrings - 1) ); + } else { + buffer.writeInt( pgMsg.getLength() ); + } + + int[] idx = new int[nbrStrings]; + String[] msgParts = pgMsg.getMsgPart( idx ); + + for ( int i = 0; i < nbrStrings; i++ ) { + buffer.writeBytes( msgParts[i].getBytes( StandardCharsets.UTF_8 ) ); + buffer.writeByte( 0 ); + } + + return buffer; + } + + + /** + * If the header is a number and not a letter, use this method to write the message (messages with a number as headers don't have a msgBody) + * + * @param header The header you want to write on the buffer + * @return The buffer with the message written on it + */ + public ByteBuf writeIntHeaderOnByteBuf( char header ) { + //write a int header... ("i" (for char headers) doesn't work TODO(FF): Figure out a way to do this with case "i" + //since headers with numbers are always indicators, don't I don't check for not standard lengths + ByteBuf buffer = ctx.alloc().buffer(); + + buffer.writeByte( header ); + buffer.writeInt( 4 ); // size excluding char + + return buffer; + } + + + /** + * Special case: write the rowDescription + * + * @param valuesPerCol The values that are needed to be sent in the rowDescription: + * String fieldName: string - column name (field name) (matters) + * int objectIDTable: int32 - ObjectID of table (if col can be id'd to table) --> otherwise 0 (doesn't matter to client while sending) + * int attributeNoCol: int16 - attr.no of col (if col can be id'd to table) --> otherwise 0 (doesn't matter to client while sending) + * int objectIDCol: int32 - objectID of parameter datatype --> 0 = unspecified (doesn't matter to client while sending, but maybe later) - see comment where this method is called from + * int dataTypeSize: int16 - size of dataType (if formatCode = 1, this needs to be set for colValLength) (doesn't matter to client while sending) + * int typeModifier: int32 - The value will generally be -1 (doesn't matter to client while sending) + * int formatCode: int16 - 0: Text | 1: Binary --> sends everything with writeBytes(formatCode = 0), if sent with writeInt it needs to be 1 (matters) + * @return The buffer with the message written on it + */ + public ByteBuf writeRowDescription( ArrayList valuesPerCol ) { + //I don't check for length, bcs rowDescription is always the same + ByteBuf buffer = ctx.alloc().buffer(); + + String fieldName; + int objectIDTable; + int attributeNoCol; + int objectIDCol; + int dataTypeSize; + int typeModifier; + int formatCode; + + int messageLength = 0; + buffer.writeByte( pgMsg.getHeaderChar() ); + + for ( int i = 0; i < valuesPerCol.size(); i++ ) { + messageLength += (6 + valuesPerCol.get( 0 ).length); + } + + buffer.writeInt( pgMsg.getLength() + messageLength ); + buffer.writeShort( Integer.parseInt( pgMsg.getMsgBody() ) ); + + for ( Object[] oneCol : valuesPerCol ) { + ByteBuf bufferTemp = ctx.alloc().buffer(); + fieldName = oneCol[0].toString(); + objectIDTable = (Integer) oneCol[1]; + attributeNoCol = (Integer) oneCol[2]; + objectIDCol = (Integer) oneCol[3]; + dataTypeSize = (Integer) oneCol[4]; + typeModifier = (Integer) oneCol[5]; + formatCode = (Integer) oneCol[6]; + + bufferTemp.writeBytes( fieldName.getBytes( StandardCharsets.UTF_8 ) ); + bufferTemp.writeByte( 0 ); + bufferTemp.writeInt( objectIDTable ); + bufferTemp.writeShort( attributeNoCol ); + bufferTemp.writeInt( objectIDCol ); + bufferTemp.writeShort( dataTypeSize ); + bufferTemp.writeInt( typeModifier ); + bufferTemp.writeShort( formatCode ); + + buffer.writeBytes( bufferTemp ); + } + //String bla = buffer.toString( Charset.defaultCharset() ); + return buffer; + } + + + public ByteBuf writeSimpleErrorMessage( LinkedHashMap fields ) { + ByteBuf buffer = ctx.alloc().buffer(); + int msgLength = 4 + 1; + + for ( String i : fields.values() ) { + msgLength += (i.length() + 1); + } + + buffer.writeByte( pgMsg.getHeaderChar() ); + buffer.writeInt( msgLength ); + + for ( String i : fields.values() ) { + msgLength += (i.length() + 1); + } + + fields.forEach( ( fieldType, fieldValue ) -> { + buffer.writeByte( fieldType ); + buffer.writeBytes( fieldValue.getBytes( StandardCharsets.UTF_8 ) ); + buffer.writeByte( 0 ); + } ); + + buffer.writeByte( 0 ); + + return buffer; + } + + +} diff --git a/plugins/postgresql-interface/src/main/java/org/polypheny/db/postgresql/PostgresqlInterfacePlugin.java b/plugins/postgresql-interface/src/main/java/org/polypheny/db/postgresql/PostgresqlInterfacePlugin.java new file mode 100644 index 0000000000..bccf3292c0 --- /dev/null +++ b/plugins/postgresql-interface/src/main/java/org/polypheny/db/postgresql/PostgresqlInterfacePlugin.java @@ -0,0 +1,48 @@ +/* + * Copyright 2019-2023 The Polypheny Project + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.polypheny.db.postgresql; + +import java.util.HashMap; +import java.util.Map; +import org.pf4j.Plugin; +import org.pf4j.PluginWrapper; +import org.polypheny.db.iface.QueryInterfaceManager; + +public class PostgresqlInterfacePlugin extends Plugin { + + /** + * Constructor to be used by plugin manager for plugin instantiation. + * Your plugins have to provide constructor with this exact signature to + * be successfully loaded by manager. + * + * @param wrapper + */ + public PostgresqlInterfacePlugin( PluginWrapper wrapper ) { + super( wrapper ); + } + + + @Override + public void start() { + // Add HTTP interface + Map httpSettings = new HashMap<>(); + httpSettings.put( "port", "5432" ); + httpSettings.put( "maxUploadSizeMb", "10000" ); + QueryInterfaceManager.addInterfaceType( "postgres", PGInterface.class, httpSettings ); + } + +} diff --git a/plugins/postgresql-interface/src/main/resources/log4j2.xml b/plugins/postgresql-interface/src/main/resources/log4j2.xml new file mode 100644 index 0000000000..5f6546b74f --- /dev/null +++ b/plugins/postgresql-interface/src/main/resources/log4j2.xml @@ -0,0 +1,22 @@ + + + + + + + + + + + + + + + + + + + + + + \ No newline at end of file diff --git a/settings.gradle b/settings.gradle index 5b88423a1f..83e5bd45ae 100644 --- a/settings.gradle +++ b/settings.gradle @@ -30,6 +30,7 @@ include 'plugins:jdbc-adapter-framework' include 'plugins:avatica-interface' include 'plugins:rest-interface' include 'plugins:http-interface' +include 'plugins:postgresql-interface' // adapters plugins include 'plugins:hsqldb-adapter'