diff --git a/connection.c b/connection.c index 088ef3d..79f8a07 100644 --- a/connection.c +++ b/connection.c @@ -26,27 +26,6 @@ /* Length of host */ #define HOST_LEN 256 -/* - * Connection cache hash table entry - * - * The lookup key in this hash table is the foreign server OID plus the user - * mapping OID. (We use just one connection per user per foreign server, - * so that we can ensure all scans use the same snapshot during a query.) - */ -typedef struct ConnCacheKey -{ - Oid serverid; /* OID of foreign server */ - Oid userid; /* OID of local user whose mapping we use */ -} ConnCacheKey; - -typedef struct ConnCacheEntry -{ - ConnCacheKey key; /* hash key (must be first) */ - MYSQL *conn; /* connection to foreign server, or NULL */ - bool invalidated; /* true if reconnect is pending */ - uint32 server_hashvalue; /* hash value of foreign server OID */ - uint32 mapping_hashvalue; /* hash value of user mapping OID */ -} ConnCacheEntry; /* * Connection cache (initialized on first use) @@ -55,6 +34,25 @@ static HTAB *ConnectionHash = NULL; static void mysql_inval_callback(Datum arg, int cacheid, uint32 hashvalue); +ConnCacheEntry *get_conn_cache_entry(ForeignServer *server, UserMapping *user, mysql_opt *opt) { + bool found; + ConnCacheEntry *entry; + ConnCacheKey key; + + if (ConnectionHash == NULL) { + return NULL; + } + + key.serverid = server->serverid; + key.userid = user->userid; + + entry = hash_search(ConnectionHash, &key, HASH_ENTER, &found); + if (!found) + return NULL; + + return entry; +} + /* * mysql_get_connection: * Get a connection which can be used to execute queries on the remote diff --git a/deparse.c b/deparse.c index eca8290..64d35da 100644 --- a/deparse.c +++ b/deparse.c @@ -1764,6 +1764,8 @@ foreign_expr_walker(Node *node, foreign_glob_cxt *glob_cxt, else if (inner_cxt.state == FDW_COLLATE_SAFE && collation == inner_cxt.collation) state = FDW_COLLATE_SAFE; + else if (collation == DEFAULT_COLLATION_OID) + state = FDW_COLLATE_NONE; else state = FDW_COLLATE_UNSAFE; } diff --git a/mysql_fdw.c b/mysql_fdw.c index 9a7f4bf..07f8dc0 100644 --- a/mysql_fdw.c +++ b/mysql_fdw.c @@ -34,6 +34,7 @@ #endif #include "commands/defrem.h" #include "commands/explain.h" +#include "common/hashfn.h" #include "catalog/heap.h" #include "catalog/pg_type.h" #include "foreign/fdwapi.h" @@ -585,6 +586,8 @@ mysqlBeginForeignScan(ForeignScanState *node, int eflags) TupleTableSlot *tupleSlot = node->ss.ss_ScanTupleSlot; TupleDesc tupleDescriptor = tupleSlot->tts_tupleDescriptor; MYSQL *conn; + ConnCacheEntry *cache_entry; + StmtCacheEntry *stmt_cache_entry; RangeTblEntry *rte; MySQLFdwExecState *festate; EState *estate = node->ss.ps.state; @@ -602,6 +605,8 @@ mysqlBeginForeignScan(ForeignScanState *node, int eflags) int rtindex; List *fdw_private = fsplan->fdw_private; char sql_mode[255]; + bool prep_stmt_found; + uint32 query_hash; /* * We'll save private state in node->fdw_state. @@ -693,37 +698,71 @@ mysqlBeginForeignScan(ForeignScanState *node, int eflags) "mysql_fdw temporary data", ALLOCSET_DEFAULT_SIZES); - if (wait_timeout > 0) - { - /* Set the session timeout in seconds */ - sprintf(timeout, "SET wait_timeout = %d", wait_timeout); - mysql_query(festate->conn, timeout); + cache_entry = get_conn_cache_entry(server, user, options); + if (cache_entry == NULL) { + elog(ERROR, "beginForeignScan, no valid cache entry from connection cache!"); } - if (interactive_timeout > 0) - { - /* Set the session timeout in seconds */ - sprintf(timeout, "SET interactive_timeout = %d", interactive_timeout); - mysql_query(festate->conn, timeout); + if (cache_entry->PrepStmtCache == NULL) { + + + HASHCTL ctl; + MemSet(&ctl, 0, sizeof(ctl)); + ctl.keysize = sizeof(uint32); + ctl.entrysize = sizeof(StmtCacheEntry); + + ctl.hcxt = CacheMemoryContext; + cache_entry->PrepStmtCache = hash_create("mysql_fdw per-connection statement cache", 200, + &ctl, + HASH_ELEM | HASH_BLOBS | HASH_CONTEXT); } - snprintf(sql_mode, sizeof(sql_mode), "SET sql_mode = '%s'", - options->sql_mode); - if (mysql_query(festate->conn, sql_mode) != 0) - mysql_error_print(festate->conn); + query_hash = string_hash(festate->query, strlen(festate->query)); + stmt_cache_entry = hash_search(cache_entry->PrepStmtCache, &query_hash, HASH_ENTER, &prep_stmt_found); + elog(WARNING, "'%s'", festate->query); + if (!prep_stmt_found) { + elog(WARNING, "No cache entry found: BE SURE YOURE CACHE PLAN CONFIG IS GENERIC NOT AUTO"); + stmt_cache_entry->key = query_hash; - /* Initialize the MySQL statement */ - festate->stmt = mysql_stmt_init(festate->conn); - if (festate->stmt == NULL) - ereport(ERROR, - (errcode(ERRCODE_FDW_UNABLE_TO_CREATE_EXECUTION), - errmsg("failed to initialize the mysql query: \n%s", - mysql_error(festate->conn)))); + if (wait_timeout > 0) + { + /* Set the session timeout in seconds */ + sprintf(timeout, "SET wait_timeout = %d", wait_timeout); + mysql_query(festate->conn, timeout); + } - /* Prepare MySQL statement */ - if (mysql_stmt_prepare(festate->stmt, festate->query, - strlen(festate->query)) != 0) - mysql_stmt_error_print(festate, "failed to prepare the MySQL query"); + if (interactive_timeout > 0) + { + /* Set the session timeout in seconds */ + sprintf(timeout, "SET interactive_timeout = %d", interactive_timeout); + mysql_query(festate->conn, timeout); + } + + snprintf(sql_mode, sizeof(sql_mode), "SET sql_mode = '%s'", + options->sql_mode); + + if (mysql_query(festate->conn, sql_mode) != 0) + mysql_error_print(festate->conn); + + /* Initialize the MySQL statement */ + festate->stmt = mysql_stmt_init(festate->conn); + if (festate->stmt == NULL) + ereport(ERROR, + (errcode(ERRCODE_FDW_UNABLE_TO_CREATE_EXECUTION), + errmsg("failed to initialize the mysql query: \n%s", + mysql_error(festate->conn)))); + + + + /* Prepare MySQL statement */ + if (mysql_stmt_prepare(festate->stmt, festate->query, + strlen(festate->query)) != 0) + mysql_stmt_error_print(festate, "failed to prepare the MySQL query"); + + stmt_cache_entry->stmt = festate->stmt; + } else { + festate->stmt = stmt_cache_entry->stmt; + } /* Prepare for output conversion of parameters used in remote query. */ numParams = list_length(fsplan->fdw_exprs); @@ -789,7 +828,7 @@ mysqlBeginForeignScan(ForeignScanState *node, int eflags) /* Bind the results pointers for the prepare statements */ if (mysql_stmt_bind_result(festate->stmt, festate->table->mysql_bind) != 0) - mysql_stmt_error_print(festate, "failed to bind the MySQL query"); + mysql_stmt_error_print(festate, "failed to bind the MySQL result"); } /* @@ -867,9 +906,6 @@ mysqlIterateForeignScan(ForeignScanState *node) #else ExecStoreTuple(tup, tupleSlot, InvalidBuffer, false); #endif - else - mysql_stmt_close(festate->stmt); - /* * Release locally palloc'd space and values of pass-by-reference * datums, as well. @@ -976,12 +1012,6 @@ mysqlEndForeignScan(ForeignScanState *node) mysql_free_result(festate->table->mysql_res); festate->table->mysql_res = NULL; } - - if (festate->stmt) - { - mysql_stmt_close(festate->stmt); - festate->stmt = NULL; - } } /* @@ -1051,11 +1081,15 @@ mysqlGetForeignRelSize(PlannerInfo *root, RelOptInfo *baserel, pull_varattnos((Node *) baserel->reltarget->exprs, baserel->relid, &attrs_used); + /* + * Identify which baserestrictinfo clauses can be sent to the remote + * server and which can't. + */ foreach(lc, baserel->baserestrictinfo) { RestrictInfo *ri = (RestrictInfo *) lfirst(lc); - if (mysql_is_foreign_expr(root, baserel, ri->clause, false)) + if (mysql_is_foreign_expr(root, baserel, ri->clause, true)) fpinfo->remote_conds = lappend(fpinfo->remote_conds, ri); else fpinfo->local_conds = lappend(fpinfo->local_conds, ri); @@ -1233,14 +1267,14 @@ mysqlGetForeignPaths(PlannerInfo *root, RelOptInfo *baserel, { Cost startup_cost; Cost total_cost; + Path *path; /* Estimate costs */ mysqlEstimateCosts(root, baserel, &startup_cost, &total_cost, foreigntableid); /* Create a ForeignPath node and add it as only possible path */ - add_path(baserel, (Path *) - create_foreignscan_path(root, baserel, + path = (Path *)create_foreignscan_path(root, baserel, NULL, /* default pathtarget */ baserel->rows, startup_cost, @@ -1248,11 +1282,14 @@ mysqlGetForeignPaths(PlannerInfo *root, RelOptInfo *baserel, NIL, /* no pathkeys */ baserel->lateral_relids, NULL, /* no extra plan */ - NULL)); /* no fdw_private data */ + NULL); /* no fdw_private data */ + add_path(baserel, path); + /* Add paths with pathkeys */ mysql_add_paths_with_pathkeys(root, baserel, NULL, startup_cost, total_cost); + } @@ -2643,7 +2680,7 @@ bind_stmt_params_and_exec(ForeignScanState *node) /* Bind the results pointers for the prepare statements */ if (mysql_stmt_bind_result(festate->stmt, festate->table->mysql_bind) != 0) - mysql_stmt_error_print(festate, "failed to bind the MySQL query"); + mysql_stmt_error_print(festate, "failed to bind the MySQL results"); MemoryContextSwitchTo(oldcontext); } diff --git a/mysql_fdw.h b/mysql_fdw.h index 81ea4bf..06185af 100644 --- a/mysql_fdw.h +++ b/mysql_fdw.h @@ -256,6 +256,36 @@ typedef struct MySQLColumn } MySQLColumn; +/* + * Connection cache hash table entry + * + * The lookup key in this hash table is the foreign server OID plus the user + * mapping OID. (We use just one connection per user per foreign server, + * so that we can ensure all scans use the same snapshot during a query.) + */ +typedef struct ConnCacheKey +{ + Oid serverid; /* OID of foreign server */ + Oid userid; /* OID of local user whose mapping we use */ +} ConnCacheKey; + +typedef struct ConnCacheEntry +{ + ConnCacheKey key; /* hash key (must be first) */ + MYSQL *conn; /* connection to foreign server, or NULL */ + bool invalidated; /* true if reconnect is pending */ + uint32 server_hashvalue; /* hash value of foreign server OID */ + uint32 mapping_hashvalue; /* hash value of user mapping OID */ + HTAB *PrepStmtCache; + +} ConnCacheEntry; + +typedef struct StmtCacheEntry +{ + uint32 key; + MYSQL_STMT *stmt; +} StmtCacheEntry; + extern int ((mysql_options) (MYSQL *mysql, enum mysql_option option, const void *arg)); extern int ((mysql_stmt_prepare) (MYSQL_STMT *stmt, const char *query, @@ -352,4 +382,6 @@ MYSQL *mysql_fdw_connect(mysql_opt *opt); void mysql_cleanup_connection(void); void mysql_release_connection(MYSQL *conn); +ConnCacheEntry *get_conn_cache_entry(ForeignServer *server, UserMapping *user, mysql_opt *opt); + #endif /* MYSQL_FDW_H */