Skip to content
This repository has been archived by the owner on May 10, 2020. It is now read-only.

Add BigChar support and iterator for columns #73

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 29 additions & 23 deletions tiberius/src/query.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
//! Query results and resultsets
use std::marker::PhantomData;
use futures::{Async, Future, Poll, Sink, Stream};
use futures::sync::oneshot;
use futures::{Async, Future, Poll, Sink, Stream};
use futures_state_stream::{StateStream, StreamEvent};
use tokens::{DoneStatus, TdsResponseToken, TokenRow};
use types::FromColumnData;
use {BoxableIo, SqlConnection, StmtResult, Error, Result};
use std::marker::PhantomData;
use std::{iter, slice};
use tokens::{DoneStatus, MetaDataColumn, TdsResponseToken, TokenRow};
use types::{ColumnData, FromColumnData};
use {BoxableIo, Error, Result, SqlConnection, StmtResult};

/// A query result consists of multiple query streams (amount of executed queries = amount of results)
#[must_use = "streams do nothing unless polled"]
Expand Down Expand Up @@ -94,9 +95,9 @@ impl<I: BoxableIo, R: StmtResult<I>> StateStream for ResultSetStream<I, R> {
let conn = self.conn.take().unwrap();
let (sender, receiver) = oneshot::channel();
self.receiver = Some(receiver);
return Ok(Async::Ready(
StreamEvent::Next(R::from_connection(conn, sender)),
));
return Ok(Async::Ready(StreamEvent::Next(R::from_connection(
conn, sender,
))));
}
}
let conn = self.conn.take().unwrap();
Expand All @@ -107,17 +108,16 @@ impl<I: BoxableIo, R: StmtResult<I>> StateStream for ResultSetStream<I, R> {
/// A stream of [`Rows`](struct.QueryRow.html) returned for the current resultset
#[must_use = "streams do nothing unless polled"]
pub struct QueryStream<I: BoxableIo> {
inner: ResultInner<I>
inner: ResultInner<I>,
}

struct ResultInner<I: BoxableIo> (
Option<(SqlConnection<I>, oneshot::Sender<SqlConnection<I>>)>,
);
struct ResultInner<I: BoxableIo>(Option<(SqlConnection<I>, oneshot::Sender<SqlConnection<I>>)>);

impl<I: BoxableIo> ResultInner<I> {
fn send_back(&mut self) -> Result<bool> {
if let Some((conn, ret_conn)) = self.0.take() {
ret_conn.send(conn)
ret_conn
.send(conn)
.map_err(|_| Error::Canceled)
.map(|_| true)
} else {
Expand Down Expand Up @@ -168,7 +168,10 @@ impl<'a, I: BoxableIo> Stream for QueryStream<I> {
impl<'a, I: BoxableIo> StmtResult<I> for QueryStream<I> {
type Result = QueryStream<I>;

fn from_connection(conn: SqlConnection<I>, ret_conn: oneshot::Sender<SqlConnection<I>>) -> QueryStream<I> {
fn from_connection(
conn: SqlConnection<I>,
ret_conn: oneshot::Sender<SqlConnection<I>>,
) -> QueryStream<I> {
QueryStream {
inner: ResultInner(Some((conn, ret_conn))),
}
Expand Down Expand Up @@ -203,9 +206,9 @@ impl<I: BoxableIo> Future for ExecFuture<I> {
self.single_token = false;
false
}
TdsResponseToken::Done(ref done) |
TdsResponseToken::DoneInProc(ref done) |
TdsResponseToken::DoneProc(ref done) => {
TdsResponseToken::Done(ref done)
| TdsResponseToken::DoneInProc(ref done)
| TdsResponseToken::DoneProc(ref done) => {
let final_token = match token {
TdsResponseToken::Done(_) | TdsResponseToken::DoneProc(_) => true,
_ => false,
Expand All @@ -216,7 +219,8 @@ impl<I: BoxableIo> Future for ExecFuture<I> {
}
// if this is the final done token, we need to reinject it for result set stream to handle it
// (as in querying, if self.single_token it already was reinjected and would result in an infinite cycle)
let reinject = !done.status.contains(DoneStatus::MORE) && !self.single_token
let reinject = !done.status.contains(DoneStatus::MORE)
&& !self.single_token
&& final_token;
if !reinject {
break;
Expand Down Expand Up @@ -252,7 +256,7 @@ impl<I: BoxableIo> StmtResult<I> for ExecFuture<I> {

/// A row in one resultset of a query
#[derive(Debug)]
pub struct QueryRow(TokenRow);
pub struct QueryRow(pub TokenRow);
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This breaks BC and also I'd like this not exposed since it's protocol internals.
(e.g. in new TDS versions row might change)


/// Anything that can be used as an index to get a specific row.
///
Expand Down Expand Up @@ -286,10 +290,7 @@ impl QueryRow {
}

/// Attempt to get a column's value for a given column index
pub fn try_get<'a, I: QueryIdx, R: FromColumnData<'a>>(
&'a self,
idx: I,
) -> Result<Option<R>> {
pub fn try_get<'a, I: QueryIdx, R: FromColumnData<'a>>(&'a self, idx: I) -> Result<Option<R>> {
let idx = match idx.to_idx(self) {
Some(x) => x,
None => return Ok(None),
Expand All @@ -298,6 +299,11 @@ impl QueryRow {
let col_data = &self.0.columns[idx];
R::from_column_data(col_data).map(Some)
}
pub fn iter(
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This exposes MetaDataColumn, which is an implementation specific representation.
Same as above applies.
Exposing this should be done using a NewType wrapper instead that exposes methods for data that we'll want and won't change (e.g. name).
Also I think we should use a slice for that instead of an iterator (similar to the postgres/mysql implementation).

&self,
) -> iter::Zip<slice::Iter<'_, MetaDataColumn>, slice::Iter<'_, ColumnData<'_>>> {
self.0.meta.columns.iter().zip(self.0.columns.iter())
}

/// Retrieve a column's value for a given column index
///
Expand Down
72 changes: 37 additions & 35 deletions tiberius/src/transport.rs
Original file line number Diff line number Diff line change
@@ -1,21 +1,21 @@
//! low level transport that deals with reading bytes from an underlying Io
//! handling data split accross packets, etc.
use byteorder::{LittleEndian, ReadBytesExt, WriteBytesExt};
use bytes::{BufMut, Bytes, BytesMut};
use futures::{Async, Poll, Sink, StartSend};
use plp::{ReadTyMode, ReadTyState};
use protocol::{self, PacketHeader, PacketStatus};
use std::collections::VecDeque;
use std::fmt;
use std::io::{self, Cursor, Write};
use std::mem;
use std::ops::{Deref, DerefMut};
use std::sync::Arc;
use std::str;
use tokio::io::{AsyncRead, AsyncWrite};
use bytes::{BufMut, Bytes, BytesMut};
use byteorder::{LittleEndian, ReadBytesExt, WriteBytesExt};
use futures::{Async, Poll, Sink, StartSend};
use protocol::{self, PacketHeader, PacketStatus};
use plp::{ReadTyMode, ReadTyState};
use std::sync::Arc;
use tokens::{TdsResponseToken, TokenColMetaData, TokenEnvChange, Tokens};
use tokio::io::{AsyncRead, AsyncWrite};
use types::ColumnData;
use {FromUint, Error};
use {Error, FromUint};

pub trait Io: AsyncRead + AsyncWrite {}
impl<I: AsyncRead + AsyncWrite> Io for I {}
Expand All @@ -25,13 +25,13 @@ pub mod tls {
extern crate native_tls;
extern crate tokio_tls;

pub use self::tokio_tls::{Connect, TlsStream};
use futures::Poll;
use protocol::{self, PacketHeader, PacketStatus, PacketType};
use std::cmp;
use std::io::{self, Read, Write};
use futures::Poll;
use tokio::io::{AsyncRead, AsyncWrite};
use protocol::{self, PacketHeader, PacketStatus, PacketType};
use transport::Io;
pub use self::tokio_tls::{Connect, TlsStream};
use Error;

impl From<native_tls::Error> for Error {
Expand Down Expand Up @@ -218,9 +218,10 @@ pub mod tls {
let mut builder = native_tls::TlsConnector::builder();

if disable_verification {
builder.danger_accept_invalid_certs(true)
.danger_accept_invalid_hostnames(true)
.use_sni(false);
builder
.danger_accept_invalid_certs(true)
.danger_accept_invalid_hostnames(true)
.use_sni(false);
}

let cx = builder.build().unwrap();
Expand Down Expand Up @@ -396,11 +397,11 @@ impl<I: Io> TdsTransport<I> {
// read the associated length for a token, if available
if let Some(ReadState::Generic(token, None)) = self.read_state {
let new_state = match token {
Tokens::SSPI |
Tokens::EnvChange |
Tokens::Info |
Tokens::Error |
Tokens::LoginAck => ReadState::Generic(
Tokens::SSPI
| Tokens::EnvChange
| Tokens::Info
| Tokens::Error
| Tokens::LoginAck => ReadState::Generic(
token,
Some(self.inner.read_u16::<LittleEndian>()? as usize),
),
Expand Down Expand Up @@ -470,8 +471,8 @@ impl<I: Io> TdsTransport<I> {
TokenEnvChange::BeginTransaction(trans_id) => {
self.transaction = trans_id;
}
TokenEnvChange::RollbackTransaction(old_trans_id) |
TokenEnvChange::CommitTransaction(old_trans_id) => {
TokenEnvChange::RollbackTransaction(old_trans_id)
| TokenEnvChange::CommitTransaction(old_trans_id) => {
assert_eq!(self.transaction, old_trans_id);
self.transaction = 0;
}
Expand Down Expand Up @@ -569,7 +570,11 @@ impl<I: Io> TdsTransportInner<I> {
}

/// read byte string with or without PLP
pub fn read_plp_type(&mut self, state: &mut Option<ReadState>, mode: ReadTyMode) -> Poll<Option<Vec<u8>>, Error> {
pub fn read_plp_type(
&mut self,
state: &mut Option<ReadState>,
mode: ReadTyMode,
) -> Poll<Option<Vec<u8>>, Error> {
match *state {
Some(ReadState::Type(_)) => (),
_ => *state = Some(ReadState::Type(ReadTyState::new(mode))),
Expand Down Expand Up @@ -618,12 +623,10 @@ impl<I: Io> TdsTransportInner<I> {
while self.missing > 0 {
let amount = try_ready!(self.io.poll_read(&mut self.hrd[offset..]));
if amount == 0 {
return Err(
io::Error::new(
io::ErrorKind::UnexpectedEof,
"unexpected EOF during header retrieval",
).into(),
);
return Err(io::Error::new(
io::ErrorKind::UnexpectedEof,
"unexpected EOF during header retrieval",
).into());
}
self.missing -= amount;
offset += amount;
Expand Down Expand Up @@ -653,19 +656,18 @@ impl<I: Io> TdsTransportInner<I> {
}
};
unsafe {
let count_result = self.io.poll_read(&mut write_buf.bytes_mut()[..self.missing]);
let count_result = self.io
.poll_read(&mut write_buf.bytes_mut()[..self.missing]);
if let Ok(Async::Ready(count)) = count_result {
write_buf.advance_mut(count);
}
mem::replace(self.rd.get_mut(), write_buf.freeze());
self.missing -= match try_ready!(count_result) {
0 => {
return Err(
io::Error::new(
io::ErrorKind::UnexpectedEof,
"unexpected EOF in packet body",
).into(),
)
return Err(io::Error::new(
io::ErrorKind::UnexpectedEof,
"unexpected EOF in packet body",
).into())
}
count => count,
};
Expand Down
Loading