Skip to content

Commit fd2f3d3

Browse files
committed
Replace timeout handling to use SharedTimer
Replaces the timeout handling for basic and bulk TDS commands to use a new SharedTimer class. SharedTimer provides a static method for fetching an existing static object or creating one on demand. Usage is tracked through reference counting and callers are required to call removeRef() when they will no longer be using the SharedTimer. If the SharedTimer does not have any more references then its internal ScheduledThreadPoolExecutor will be shutdown. The SharedTimer is cached at the Connection level so that repeated invocations do not create new timers. Connections only create timers on first use so if no actions involve a timeout then no timer is fetched or created. If a Connection does create a timer then it will be released when the Connection closed. Properly written JDBC applications that always close their Connection objects when they are finished using them should not have any extra threads running after they are all closed. Applications that do not use query timeouts will not have any extra threads created as they are only done on demand. Applications that use timeouts and use a JDBC connection pool will have a single shared object across all JDBC connections as long as there are some open connections in the pool with timeouts enabled. Interrupt actions to handle a timeout are executed in their own thread. A handler thread is created when the timeout occurs with the thread name matching the connection id of the client connection that created the timeout. If the timeout is canceled prior to the interrupt action being executed, say because the command finished, then no handler thread is created. Note that the sharing of the timers happens across all Connections, not just Connections with the same JDBC URL and properties.
1 parent d639870 commit fd2f3d3

File tree

10 files changed

+226
-226
lines changed

10 files changed

+226
-226
lines changed
Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
package com.microsoft.sqlserver.jdbc;
2+
3+
import java.util.UUID;
4+
import java.util.concurrent.atomic.AtomicLong;
5+
6+
7+
abstract class AbstractTimeoutTask implements SqlServerTimerTask {
8+
private static final AtomicLong COUNTER = new AtomicLong(0);
9+
private final UUID connectionId;
10+
11+
AbstractTimeoutTask(UUID connectionId) {
12+
this.connectionId = connectionId;
13+
}
14+
15+
/**
16+
* Action to interrupt the command implemented by concrete subclasses.
17+
*/
18+
protected abstract void interrupt();
19+
20+
@Override
21+
public final void run() {
22+
// Create a new thread to run the interrupt to ensure that blocking operations performed
23+
// by the interrupt do not hang the primary timer thread.
24+
String name = "mssql-timeout-task-" + COUNTER.incrementAndGet() + "-" + connectionId;
25+
Thread thread = new Thread(this::interrupt, name);
26+
thread.setDaemon(true);
27+
thread.start();
28+
}
29+
}

src/main/java/com/microsoft/sqlserver/jdbc/IOBuffer.java

Lines changed: 17 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@
5252
import java.util.Set;
5353
import java.util.SimpleTimeZone;
5454
import java.util.TimeZone;
55+
import java.util.concurrent.ScheduledFuture;
5556
import java.util.concurrent.SynchronousQueue;
5657
import java.util.concurrent.ThreadPoolExecutor;
5758
import java.util.concurrent.TimeUnit;
@@ -3016,6 +3017,10 @@ void setDataLoggable(boolean value) {
30163017
dataIsLoggable = value;
30173018
}
30183019

3020+
SharedTimer getSharedTimer() {
3021+
return con.getSharedTimer();
3022+
}
3023+
30193024
private TDSCommand command = null;
30203025

30213026
// TDS message type (Query, RPC, DTC, etc.) sent at the beginning
@@ -6236,7 +6241,7 @@ final class TDSReaderMark {
62366241
final class TDSReader {
62376242
private final static Logger logger = Logger.getLogger("com.microsoft.sqlserver.jdbc.internals.TDS.Reader");
62386243
final private String traceID;
6239-
private TimeoutCommand<TDSCommand> timeoutCommand;
6244+
private ScheduledFuture<?> timeout;
62406245

62416246
final public String toString() {
62426247
return traceID;
@@ -6390,9 +6395,8 @@ synchronized final boolean readPacket() throws SQLServerException {
63906395
// terminate the connection.
63916396
if ((command.getCancelQueryTimeoutSeconds() > 0 && command.getQueryTimeoutSeconds() > 0)) {
63926397
// if a timeout is configured with this object, add it to the timeout poller
6393-
int timeout = command.getCancelQueryTimeoutSeconds() + command.getQueryTimeoutSeconds();
6394-
this.timeoutCommand = new TdsTimeoutCommand(timeout, this.command, this.con);
6395-
TimeoutPoller.getTimeoutPoller().addTimeoutCommand(this.timeoutCommand);
6398+
int seconds = command.getCancelQueryTimeoutSeconds() + command.getQueryTimeoutSeconds();
6399+
this.timeout = con.getSharedTimer().schedule(new TdsCommandTimeoutTask(command, con), seconds);
63966400
}
63976401
}
63986402
// First, read the packet header.
@@ -6413,8 +6417,9 @@ synchronized final boolean readPacket() throws SQLServerException {
64136417
}
64146418

64156419
// if execution was subject to timeout then stop timing
6416-
if (this.timeoutCommand != null) {
6417-
TimeoutPoller.getTimeoutPoller().remove(this.timeoutCommand);
6420+
if (this.timeout != null) {
6421+
this.timeout.cancel(false);
6422+
this.timeout = null;
64186423
}
64196424
// Header size is a 2 byte unsigned short integer in big-endian order.
64206425
int packetLength = Util.readUnsignedShortBigEndian(newPacket.header, TDS.PACKET_HEADER_MESSAGE_LENGTH);
@@ -7003,42 +7008,6 @@ final void trySetSensitivityClassification(SensitivityClassification sensitivity
70037008
}
70047009

70057010

7006-
/**
7007-
* The tds default implementation of a timeout command
7008-
*/
7009-
class TdsTimeoutCommand extends TimeoutCommand<TDSCommand> {
7010-
public TdsTimeoutCommand(int timeout, TDSCommand command, SQLServerConnection sqlServerConnection) {
7011-
super(timeout, command, sqlServerConnection);
7012-
}
7013-
7014-
public void interrupt() {
7015-
TDSCommand command = getCommand();
7016-
SQLServerConnection sqlServerConnection = getSqlServerConnection();
7017-
try {
7018-
// If TCP Connection to server is silently dropped, exceeding the query timeout
7019-
// on the same connection does
7020-
// not throw SQLTimeoutException
7021-
// The application stops responding instead until SocketTimeoutException is
7022-
// thrown. In this case, we must
7023-
// manually terminate the connection.
7024-
if (null == command && null != sqlServerConnection) {
7025-
sqlServerConnection.terminate(SQLServerException.DRIVER_ERROR_IO_FAILED,
7026-
SQLServerException.getErrString("R_connectionIsClosed"));
7027-
} else {
7028-
// If the timer wasn't canceled before it ran out of
7029-
// time then interrupt the registered command.
7030-
command.interrupt(SQLServerException.getErrString("R_queryTimedOut"));
7031-
}
7032-
} catch (SQLServerException e) {
7033-
// Unfortunately, there's nothing we can do if we
7034-
// fail to time out the request. There is no way
7035-
// to report back what happened.
7036-
assert null != command;
7037-
command.log(Level.FINE, "Command could not be timed out. Reason: " + e.getMessage());
7038-
}
7039-
}
7040-
}
7041-
70427011
/**
70437012
* TDSCommand encapsulates an interruptable TDS conversation.
70447013
*
@@ -7160,7 +7129,7 @@ protected void setProcessedResponse(boolean processedResponse) {
71607129
private volatile boolean readingResponse;
71617130
private int queryTimeoutSeconds;
71627131
private int cancelQueryTimeoutSeconds;
7163-
private TdsTimeoutCommand timeoutCommand;
7132+
private ScheduledFuture<?> timeout;
71647133

71657134
protected int getQueryTimeoutSeconds() {
71667135
return this.queryTimeoutSeconds;
@@ -7576,8 +7545,8 @@ final TDSReader startResponse(boolean isAdaptive) throws SQLServerException {
75767545
// If command execution is subject to timeout then start timing until
75777546
// the server returns the first response packet.
75787547
if (queryTimeoutSeconds > 0) {
7579-
this.timeoutCommand = new TdsTimeoutCommand(queryTimeoutSeconds, this, null);
7580-
TimeoutPoller.getTimeoutPoller().addTimeoutCommand(this.timeoutCommand);
7548+
SQLServerConnection conn = tdsReader != null ? tdsReader.getConnection() : null;
7549+
this.timeout = tdsWriter.getSharedTimer().schedule(new TdsCommandTimeoutTask(this, conn), queryTimeoutSeconds);
75817550
}
75827551

75837552
if (logger.isLoggable(Level.FINEST))
@@ -7600,8 +7569,9 @@ final TDSReader startResponse(boolean isAdaptive) throws SQLServerException {
76007569
} finally {
76017570
// If command execution was subject to timeout then stop timing as soon
76027571
// as the server returns the first response packet or errors out.
7603-
if (this.timeoutCommand != null) {
7604-
TimeoutPoller.getTimeoutPoller().remove(this.timeoutCommand);
7572+
if (this.timeout != null) {
7573+
this.timeout.cancel(false);
7574+
this.timeout = null;
76057575
}
76067576
}
76077577

src/main/java/com/microsoft/sqlserver/jdbc/SQLServerBulkCopy.java

Lines changed: 20 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@
4242
import java.util.SimpleTimeZone;
4343
import java.util.TimeZone;
4444
import java.util.UUID;
45+
import java.util.concurrent.ScheduledFuture;
4546
import java.util.logging.Level;
4647

4748
import javax.sql.RowSet;
@@ -246,31 +247,7 @@ class BulkColumnMetaData {
246247
*/
247248
private int srcColumnCount;
248249

249-
/**
250-
* Timeout for the bulk copy command
251-
*/
252-
private final class BulkTimeoutCommand extends TimeoutCommand<TDSCommand> {
253-
public BulkTimeoutCommand(int timeout, TDSCommand command, SQLServerConnection sqlServerConnection) {
254-
super(timeout, command, sqlServerConnection);
255-
}
256-
257-
@Override
258-
public void interrupt() {
259-
TDSCommand command = getCommand();
260-
// If the timer wasn't canceled before it ran out of
261-
// time then interrupt the registered command.
262-
try {
263-
command.interrupt(SQLServerException.getErrString("R_queryTimedOut"));
264-
} catch (SQLServerException e) {
265-
// Unfortunately, there's nothing we can do if we
266-
// fail to time out the request. There is no way
267-
// to report back what happened.
268-
command.log(Level.FINE, "Command could not be timed out. Reason: " + e.getMessage());
269-
}
270-
}
271-
}
272-
273-
private BulkTimeoutCommand timeoutCommand;
250+
private ScheduledFuture<?> timeout;
274251

275252
/**
276253
* The maximum temporal precision we can send when using varchar(precision) in bulkcommand, to send a
@@ -646,16 +623,13 @@ private void sendBulkLoadBCP() throws SQLServerException {
646623
final class InsertBulk extends TDSCommand {
647624
InsertBulk() {
648625
super("InsertBulk", 0, 0);
649-
int timeoutSeconds = copyOptions.getBulkCopyTimeout();
650-
timeoutCommand = timeoutSeconds > 0 ? new BulkTimeoutCommand(timeoutSeconds, this, null) : null;
651626
}
652627

653628
final boolean doExecute() throws SQLServerException {
654-
if (null != timeoutCommand) {
655-
if (logger.isLoggable(Level.FINEST))
656-
logger.finest(this.toString() + ": Starting bulk timer...");
657-
658-
TimeoutPoller.getTimeoutPoller().addTimeoutCommand(timeoutCommand);
629+
int timeoutSeconds = copyOptions.getBulkCopyTimeout();
630+
if (timeoutSeconds > 0) {
631+
connection.checkClosed();
632+
timeout = connection.getSharedTimer().schedule(new TdsBulkCommandTimeoutTask(this, connection), timeoutSeconds);
659633
}
660634

661635
// doInsertBulk inserts the rows in one batch. It returns true if there are more rows in
@@ -671,21 +645,27 @@ final boolean doExecute() throws SQLServerException {
671645
}
672646

673647
// Check whether it is a timeout exception.
674-
if (rootCause instanceof SQLException) {
675-
checkForTimeoutException((SQLException) rootCause, timeoutCommand);
648+
if (rootCause instanceof SQLException && timeout != null && timeout.isDone()) {
649+
SQLException sqlEx = (SQLException) rootCause;
650+
if (sqlEx.getSQLState() != null
651+
&& sqlEx.getSQLState().equals(SQLState.STATEMENT_CANCELED.getSQLStateCode())) {
652+
// If SQLServerBulkCopy is managing the transaction, a rollback is needed.
653+
if (copyOptions.isUseInternalTransaction()) {
654+
connection.rollback();
655+
}
656+
throw new SQLServerException(SQLServerException.getErrString("R_queryTimedOut"),
657+
SQLState.STATEMENT_CANCELED, DriverError.NOT_SET, sqlEx);
658+
}
676659
}
677660

678661
// It is not a timeout exception. Re-throw.
679662
throw topLevelException;
680663
}
681664

682-
if (null != timeoutCommand) {
683-
if (logger.isLoggable(Level.FINEST))
684-
logger.finest(this.toString() + ": Stopping bulk timer...");
685-
686-
TimeoutPoller.getTimeoutPoller().remove(timeoutCommand);
665+
if (timeout != null) {
666+
timeout.cancel(true);
667+
timeout = null;
687668
}
688-
689669
return true;
690670
}
691671
}
@@ -1145,22 +1125,6 @@ private void writeColumnMetaData(TDSWriter tdsWriter) throws SQLServerException
11451125
}
11461126
}
11471127

1148-
/**
1149-
* Helper method that throws a timeout exception if the cause of the exception was that the query was cancelled
1150-
*/
1151-
private void checkForTimeoutException(SQLException e, BulkTimeoutCommand timeoutCommand) throws SQLServerException {
1152-
if ((null != e.getSQLState()) && (e.getSQLState().equals(SQLState.STATEMENT_CANCELED.getSQLStateCode()))
1153-
&& timeoutCommand.canTimeout()) {
1154-
// If SQLServerBulkCopy is managing the transaction, a rollback is needed.
1155-
if (copyOptions.isUseInternalTransaction()) {
1156-
connection.rollback();
1157-
}
1158-
1159-
throw new SQLServerException(SQLServerException.getErrString("R_queryTimedOut"),
1160-
SQLState.STATEMENT_CANCELED, DriverError.NOT_SET, e);
1161-
}
1162-
}
1163-
11641128
/**
11651129
* Validates whether the source JDBC types are compatible with the destination table data types. We need to do this
11661130
* only once for the whole bulk copy session.

src/main/java/com/microsoft/sqlserver/jdbc/SQLServerConnection.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -141,6 +141,18 @@ public class SQLServerConnection implements ISQLServerConnection, java.io.Serial
141141

142142
private Boolean isAzureDW = null;
143143

144+
private SharedTimer sharedTimer;
145+
146+
SharedTimer getSharedTimer() {
147+
if (state == State.Closed) {
148+
throw new IllegalStateException("Connection is closed");
149+
}
150+
if (sharedTimer == null) {
151+
this.sharedTimer = SharedTimer.getTimer();
152+
}
153+
return this.sharedTimer;
154+
}
155+
144156
static class CityHash128Key implements java.io.Serializable {
145157

146158
/**
@@ -3174,6 +3186,11 @@ public void close() throws SQLServerException {
31743186
// with the connection.
31753187
setState(State.Closed);
31763188

3189+
if (sharedTimer != null) {
3190+
sharedTimer.removeRef();
3191+
sharedTimer = null;
3192+
}
3193+
31773194
// Close the TDS channel. When the channel is closed, the server automatically
31783195
// rolls back any pending transactions and closes associated resources like
31793196
// prepared handles.
Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
/*
2+
* Microsoft JDBC Driver for SQL Server Copyright(c) Microsoft Corporation All rights reserved. This program is made
3+
* available under the terms of the MIT License. See the LICENSE file in the project root for more information.
4+
*/
5+
package com.microsoft.sqlserver.jdbc;
6+
7+
import java.util.concurrent.ScheduledFuture;
8+
import java.util.concurrent.ScheduledThreadPoolExecutor;
9+
import java.util.concurrent.ThreadFactory;
10+
import java.util.concurrent.TimeUnit;
11+
import java.util.concurrent.atomic.AtomicLong;
12+
13+
14+
class SharedTimer {
15+
private static final AtomicLong CORE_THREAD_COUNTER = new AtomicLong();
16+
private static SharedTimer INSTANCE;
17+
18+
private final long id = CORE_THREAD_COUNTER.getAndIncrement();
19+
private int refCount = 0;
20+
private ScheduledThreadPoolExecutor executor;
21+
22+
private SharedTimer() {
23+
executor = new ScheduledThreadPoolExecutor(1, new ThreadFactory() {
24+
@Override
25+
public Thread newThread(Runnable task) {
26+
return new Thread(task, "mssql-jdbc-shared-timer-core-" + id);
27+
}
28+
});
29+
executor.setRemoveOnCancelPolicy(true);
30+
}
31+
32+
public synchronized void removeRef() {
33+
if (refCount <= 0) {
34+
throw new IllegalStateException("removeRef() called more than actual references");
35+
}
36+
refCount -= 1;
37+
if (refCount == 0) {
38+
// Removed last reference so perform cleanup
39+
executor.shutdownNow();
40+
executor = null;
41+
INSTANCE = null;
42+
}
43+
}
44+
45+
public static synchronized SharedTimer getTimer() {
46+
if (INSTANCE == null) {
47+
// No shared object exists so create a new one
48+
INSTANCE = new SharedTimer();
49+
}
50+
INSTANCE.refCount += 1;
51+
return INSTANCE;
52+
}
53+
54+
public ScheduledFuture<?> schedule(SqlServerTimerTask task, long delaySeconds) {
55+
return schedule(task, delaySeconds, TimeUnit.SECONDS);
56+
}
57+
58+
public ScheduledFuture<?> schedule(SqlServerTimerTask task, long delay, TimeUnit unit) {
59+
if (executor == null) {
60+
throw new IllegalStateException("Cannot schedule tasks after shutdown");
61+
}
62+
return executor.schedule(task, delay, unit);
63+
}
64+
}
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
package com.microsoft.sqlserver.jdbc;
2+
3+
public interface SqlServerTimerTask extends Runnable {}

0 commit comments

Comments
 (0)