Source code for pytds.tds_base

"""
.. module:: tds_base
   :platform: Unix, Windows, MacOSX
   :synopsis: Various internal stuff

.. moduleauthor:: Mikhail Denisenko <denisenkom@gmail.com>
"""
from __future__ import annotations

import datetime
import logging
import socket
import struct
import typing
from collections import deque
from typing import Protocol, Iterable, TypedDict, Tuple, Any

import pytds
from pytds.collate import ucs2_codec

logger = logging.getLogger("pytds")

# tds protocol versions
TDS70 = 0x70000000
TDS71 = 0x71000000
TDS71rev1 = 0x71000001
TDS72 = 0x72090002
TDS73A = 0x730A0003
TDS73 = TDS73A
TDS73B = 0x730B0003
TDS74 = 0x74000004


if typing.TYPE_CHECKING:
    from pytds.tds_session import _TdsSession


def IS_TDS7_PLUS(x: _TdsSession):
    return x.tds_version >= TDS70


def IS_TDS71_PLUS(x: _TdsSession):
    return x.tds_version >= TDS71


def IS_TDS72_PLUS(x: _TdsSession):
    return x.tds_version >= TDS72


def IS_TDS73_PLUS(x: _TdsSession):
    return x.tds_version >= TDS73A


# https://msdn.microsoft.com/en-us/library/dd304214.aspx
class PacketType:
    QUERY = 1
    OLDLOGIN = 2
    RPC = 3
    REPLY = 4
    CANCEL = 6
    BULK = 7
    FEDAUTHTOKEN = 8
    TRANS = 14  # transaction management
    LOGIN = 16
    AUTH = 17
    PRELOGIN = 18


# mssql login options flags
# option_flag1_values
TDS_BYTE_ORDER_X86 = 0
TDS_CHARSET_ASCII = 0
TDS_DUMPLOAD_ON = 0
TDS_FLOAT_IEEE_754 = 0
TDS_INIT_DB_WARN = 0
TDS_SET_LANG_OFF = 0
TDS_USE_DB_SILENT = 0
TDS_BYTE_ORDER_68000 = 0x01
TDS_CHARSET_EBDDIC = 0x02
TDS_FLOAT_VAX = 0x04
TDS_FLOAT_ND5000 = 0x08
TDS_DUMPLOAD_OFF = 0x10  # prevent BCP
TDS_USE_DB_NOTIFY = 0x20
TDS_INIT_DB_FATAL = 0x40
TDS_SET_LANG_ON = 0x80

# enum option_flag2_values
TDS_INIT_LANG_WARN = 0
TDS_INTEGRATED_SECURTY_OFF = 0
TDS_ODBC_OFF = 0
TDS_USER_NORMAL = 0  # SQL Server login
TDS_INIT_LANG_REQUIRED = 0x01
TDS_ODBC_ON = 0x02
TDS_TRANSACTION_BOUNDARY71 = 0x04  # removed in TDS 7.2
TDS_CACHE_CONNECT71 = 0x08  # removed in TDS 7.2
TDS_USER_SERVER = 0x10  # reserved
TDS_USER_REMUSER = 0x20  # DQ login
TDS_USER_SQLREPL = 0x40  # replication login
TDS_INTEGRATED_SECURITY_ON = 0x80

# enum option_flag3_values TDS 7.3+
TDS_RESTRICTED_COLLATION = 0
TDS_CHANGE_PASSWORD = 0x01
TDS_SEND_YUKON_BINARY_XML = 0x02
TDS_REQUEST_USER_INSTANCE = 0x04
TDS_UNKNOWN_COLLATION_HANDLING = 0x08
TDS_ANY_COLLATION = 0x10

TDS5_PARAMFMT2_TOKEN = 32  # 0x20
TDS_LANGUAGE_TOKEN = 33  # 0x20    TDS 5.0 only
TDS_ORDERBY2_TOKEN = 34  # 0x22
TDS_ROWFMT2_TOKEN = 97  # 0x61    TDS 5.0 only
TDS_LOGOUT_TOKEN = 113  # 0x71    TDS 5.0 only?
TDS_RETURNSTATUS_TOKEN = 121  # 0x79
TDS_PROCID_TOKEN = 124  # 0x7C    TDS 4.2 only
TDS7_RESULT_TOKEN = 129  # 0x81    TDS 7.0 only
TDS7_COMPUTE_RESULT_TOKEN = 136  # 0x88    TDS 7.0 only
TDS_COLNAME_TOKEN = 160  # 0xA0    TDS 4.2 only
TDS_COLFMT_TOKEN = 161  # 0xA1    TDS 4.2 only
TDS_DYNAMIC2_TOKEN = 163  # 0xA3
TDS_TABNAME_TOKEN = 164  # 0xA4
TDS_COLINFO_TOKEN = 165  # 0xA5
TDS_OPTIONCMD_TOKEN = 166  # 0xA6
TDS_COMPUTE_NAMES_TOKEN = 167  # 0xA7
TDS_COMPUTE_RESULT_TOKEN = 168  # 0xA8
TDS_ORDERBY_TOKEN = 169  # 0xA9
TDS_ERROR_TOKEN = 170  # 0xAA
TDS_INFO_TOKEN = 171  # 0xAB
TDS_PARAM_TOKEN = 172  # 0xAC
TDS_LOGINACK_TOKEN = 173  # 0xAD
TDS_CONTROL_TOKEN = 174  # 0xAE
TDS_ROW_TOKEN = 209  # 0xD1
TDS_NBC_ROW_TOKEN = 210  # 0xD2    as of TDS 7.3.B
TDS_CMP_ROW_TOKEN = 211  # 0xD3
TDS5_PARAMS_TOKEN = 215  # 0xD7    TDS 5.0 only
TDS_CAPABILITY_TOKEN = 226  # 0xE2
TDS_ENVCHANGE_TOKEN = 227  # 0xE3
TDS_DBRPC_TOKEN = 230  # 0xE6
TDS5_DYNAMIC_TOKEN = 231  # 0xE7    TDS 5.0 only
TDS5_PARAMFMT_TOKEN = 236  # 0xEC    TDS 5.0 only
TDS_AUTH_TOKEN = 237  # 0xED    TDS 7.0 only
TDS_RESULT_TOKEN = 238  # 0xEE
TDS_DONE_TOKEN = 253  # 0xFD
TDS_DONEPROC_TOKEN = 254  # 0xFE
TDS_DONEINPROC_TOKEN = 255  # 0xFF

# CURSOR support: TDS 5.0 only
TDS_CURCLOSE_TOKEN = 128  # 0x80    TDS 5.0 only
TDS_CURDELETE_TOKEN = 129  # 0x81    TDS 5.0 only
TDS_CURFETCH_TOKEN = 130  # 0x82    TDS 5.0 only
TDS_CURINFO_TOKEN = 131  # 0x83    TDS 5.0 only
TDS_CUROPEN_TOKEN = 132  # 0x84    TDS 5.0 only
TDS_CURDECLARE_TOKEN = 134  # 0x86    TDS 5.0 only

# environment type field
TDS_ENV_DATABASE = 1
TDS_ENV_LANG = 2
TDS_ENV_CHARSET = 3
TDS_ENV_PACKSIZE = 4
TDS_ENV_LCID = 5
TDS_ENV_UNICODE_DATA_SORT_COMP_FLAGS = 6
TDS_ENV_SQLCOLLATION = 7
TDS_ENV_BEGINTRANS = 8
TDS_ENV_COMMITTRANS = 9
TDS_ENV_ROLLBACKTRANS = 10
TDS_ENV_ENLIST_DTC_TRANS = 11
TDS_ENV_DEFECT_TRANS = 12
TDS_ENV_DB_MIRRORING_PARTNER = 13
TDS_ENV_PROMOTE_TRANS = 15
TDS_ENV_TRANS_MANAGER_ADDR = 16
TDS_ENV_TRANS_ENDED = 17
TDS_ENV_RESET_COMPLETION_ACK = 18
TDS_ENV_INSTANCE_INFO = 19
TDS_ENV_ROUTING = 20

# Microsoft internal stored procedure id's
TDS_SP_CURSOR = 1
TDS_SP_CURSOROPEN = 2
TDS_SP_CURSORPREPARE = 3
TDS_SP_CURSOREXECUTE = 4
TDS_SP_CURSORPREPEXEC = 5
TDS_SP_CURSORUNPREPARE = 6
TDS_SP_CURSORFETCH = 7
TDS_SP_CURSOROPTION = 8
TDS_SP_CURSORCLOSE = 9
TDS_SP_EXECUTESQL = 10
TDS_SP_PREPARE = 11
TDS_SP_EXECUTE = 12
TDS_SP_PREPEXEC = 13
TDS_SP_PREPEXECRPC = 14
TDS_SP_UNPREPARE = 15

# Flags returned in TDS_DONE token
TDS_DONE_FINAL = 0
TDS_DONE_MORE_RESULTS = 0x01  # more results follow
TDS_DONE_ERROR = 0x02  # error occurred
TDS_DONE_INXACT = 0x04  # transaction in progress
TDS_DONE_PROC = 0x08  # results are from a stored procedure
TDS_DONE_COUNT = 0x10  # count field in packet is valid
TDS_DONE_CANCELLED = 0x20  # acknowledging an attention command (usually a cancel)
TDS_DONE_EVENT = 0x40  # part of an event notification.
TDS_DONE_SRVERROR = 0x100  # SQL server server error


SYBVOID = 31  # 0x1F
IMAGETYPE = SYBIMAGE = 34  # 0x22
TEXTTYPE = SYBTEXT = 35  # 0x23
SYBVARBINARY = 37  # 0x25
INTNTYPE = SYBINTN = 38  # 0x26
SYBVARCHAR = 39  # 0x27
BINARYTYPE = SYBBINARY = 45  # 0x2D
SYBCHAR = 47  # 0x2F
INT1TYPE = SYBINT1 = 48  # 0x30
BITTYPE = SYBBIT = 50  # 0x32
INT2TYPE = SYBINT2 = 52  # 0x34
INT4TYPE = SYBINT4 = 56  # 0x38
DATETIM4TYPE = SYBDATETIME4 = 58  # 0x3A
FLT4TYPE = SYBREAL = 59  # 0x3B
MONEYTYPE = SYBMONEY = 60  # 0x3C
DATETIMETYPE = SYBDATETIME = 61  # 0x3D
FLT8TYPE = SYBFLT8 = 62  # 0x3E
NTEXTTYPE = SYBNTEXT = 99  # 0x63
SYBNVARCHAR = 103  # 0x67
BITNTYPE = SYBBITN = 104  # 0x68
NUMERICNTYPE = SYBNUMERIC = 108  # 0x6C
DECIMALNTYPE = SYBDECIMAL = 106  # 0x6A
FLTNTYPE = SYBFLTN = 109  # 0x6D
MONEYNTYPE = SYBMONEYN = 110  # 0x6E
DATETIMNTYPE = SYBDATETIMN = 111  # 0x6F
MONEY4TYPE = SYBMONEY4 = 122  # 0x7A

INT8TYPE = SYBINT8 = 127  # 0x7F
BIGCHARTYPE = XSYBCHAR = 175  # 0xAF
BIGVARCHRTYPE = XSYBVARCHAR = 167  # 0xA7
NVARCHARTYPE = XSYBNVARCHAR = 231  # 0xE7
NCHARTYPE = XSYBNCHAR = 239  # 0xEF
BIGVARBINTYPE = XSYBVARBINARY = 165  # 0xA5
BIGBINARYTYPE = XSYBBINARY = 173  # 0xAD
GUIDTYPE = SYBUNIQUE = 36  # 0x24
SSVARIANTTYPE = SYBVARIANT = 98  # 0x62
UDTTYPE = SYBMSUDT = 240  # 0xF0
XMLTYPE = SYBMSXML = 241  # 0xF1
TVPTYPE = 243  # 0xF3
DATENTYPE = SYBMSDATE = 40  # 0x28
TIMENTYPE = SYBMSTIME = 41  # 0x29
DATETIME2NTYPE = SYBMSDATETIME2 = 42  # 0x2a
DATETIMEOFFSETNTYPE = SYBMSDATETIMEOFFSET = 43  # 0x2b

# TDS type flag
TDS_FSQLTYPE_SQL_DFLT = 0x00
TDS_FSQLTYPE_SQL_TSQL = 0x01
TDS_FOLEDB = 0x10
TDS_FREADONLY_INTENT = 0x20

#
# Sybase only types
#
SYBLONGBINARY = 225  # 0xE1
SYBUINT1 = 64  # 0x40
SYBUINT2 = 65  # 0x41
SYBUINT4 = 66  # 0x42
SYBUINT8 = 67  # 0x43
SYBBLOB = 36  # 0x24
SYBBOUNDARY = 104  # 0x68
SYBDATE = 49  # 0x31
SYBDATEN = 123  # 0x7B
SYB5INT8 = 191  # 0xBF
SYBINTERVAL = 46  # 0x2E
SYBLONGCHAR = 175  # 0xAF
SYBSENSITIVITY = 103  # 0x67
SYBSINT1 = 176  # 0xB0
SYBTIME = 51  # 0x33
SYBTIMEN = 147  # 0x93
SYBUINTN = 68  # 0x44
SYBUNITEXT = 174  # 0xAE
SYBXML = 163  # 0xA3

TDS_UT_TIMESTAMP = 80

# compute operator
SYBAOPCNT = 0x4B
SYBAOPCNTU = 0x4C
SYBAOPSUM = 0x4D
SYBAOPSUMU = 0x4E
SYBAOPAVG = 0x4F
SYBAOPAVGU = 0x50
SYBAOPMIN = 0x51
SYBAOPMAX = 0x52

# mssql2k compute operator
SYBAOPCNT_BIG = 0x09
SYBAOPSTDEV = 0x30
SYBAOPSTDEVP = 0x31
SYBAOPVAR = 0x32
SYBAOPVARP = 0x33
SYBAOPCHECKSUM_AGG = 0x72

# param flags
fByRefValue = 1
fDefaultValue = 2

TDS_IDLE = 0
TDS_QUERYING = 1
TDS_PENDING = 2
TDS_READING = 3
TDS_DEAD = 4
state_names = ["IDLE", "QUERYING", "PENDING", "READING", "DEAD"]

TDS_ENCRYPTION_OFF = 0
TDS_ENCRYPTION_REQUEST = 1
TDS_ENCRYPTION_REQUIRE = 2


[docs] class PreLoginToken: """ PRELOGIN token option identifiers, corresponds to PL_OPTION_TOKEN in the spec. Spec link: https://learn.microsoft.com/en-us/openspecs/windows_protocols/ms-tds/60f56408-0188-4cd5-8b90-25c6f2423868 """ VERSION = 0 ENCRYPTION = 1 INSTOPT = 2 THREADID = 3 MARS = 4 TRACEID = 5 FEDAUTHREQUIRED = 6 NONCEOPT = 7 TERMINATOR = 0xFF
[docs] class PreLoginEnc: """ PRELOGIN encryption parameter. Spec link: https://learn.microsoft.com/en-us/openspecs/windows_protocols/ms-tds/60f56408-0188-4cd5-8b90-25c6f2423868 """ ENCRYPT_OFF = 0 # Encryption available but off ENCRYPT_ON = 1 # Encryption available and on ENCRYPT_NOT_SUP = 2 # Encryption not available ENCRYPT_REQ = 3 # Encryption required
PLP_MARKER = 0xFFFF PLP_NULL = 0xFFFFFFFFFFFFFFFF PLP_UNKNOWN = 0xFFFFFFFFFFFFFFFE TDS_NO_COUNT = -1 TVP_NULL_TOKEN = 0xFFFF # TVP COLUMN FLAGS TVP_COLUMN_DEFAULT_FLAG = 0x200 TVP_END_TOKEN = 0x00 TVP_ROW_TOKEN = 0x01 TVP_ORDER_UNIQUE_TOKEN = 0x10 TVP_COLUMN_ORDERING_TOKEN = 0x11 class CommonEqualityMixin(object): def __eq__(self, other): return isinstance(other, self.__class__) and self.__dict__ == other.__dict__ def __ne__(self, other): return not self.__eq__(other)
[docs] def iterdecode(iterable, codec): """Uses an incremental decoder to decode each chunk of string in iterable. This function is a generator. :param iterable: Iterable object which yields raw data to be decoded. :param codec: An instance of a codec which will be used for decoding. """ decoder = codec.incrementaldecoder() for chunk in iterable: yield decoder.decode(chunk) yield decoder.decode(b"", True)
[docs] def force_unicode(s): """ Convert input into a string. If input is a byte array, it will be decoded using UTF8 decoder. """ if isinstance(s, bytes): try: return s.decode("utf8") except UnicodeDecodeError as e: raise DatabaseError(e) elif isinstance(s, str): return s else: return str(s)
[docs] def tds_quote_id(ident): """Quote an identifier according to MSSQL rules :param ident: identifier to quote :returns: Quoted identifier """ return "[{0}]".format(ident.replace("]", "]]"))
# store a tuple of programming error codes prog_errors = ( 102, # syntax error 207, # invalid column name 208, # invalid object name 2812, # unknown procedure 4104, # multi-part identifier could not be bound ) # store a tuple of integrity error codes integrity_errors = ( 515, # NULL insert 547, # FK related 2601, # violate unique index 2627, # violate UNIQUE KEY constraint ) def my_ord(val): return val def join_bytearrays(ba): return b"".join(ba) # exception hierarchy
[docs] class Warning(Exception): pass
[docs] class Error(Exception): """ Base class for all error classes, except TimeoutError """ pass
TimeoutError = socket.timeout
[docs] class InterfaceError(Error): """ TODO add documentation """ pass
[docs] class DatabaseError(Error): """ This error is raised when MSSQL server returns an error which includes error number """ def __init__(self, msg: str, exc: typing.Any | None = None): super().__init__(msg, exc) self.msg_no = 0 self.text = msg self.srvname = "" self.procname = "" self.number = 0 self.severity = 0 self.state = 0 self.line = 0 @property def message(self): if self.procname: return ( "SQL Server message %d, severity %d, state %d, " "procedure %s, line %d:\n%s" % ( self.number, self.severity, self.state, self.procname, self.line, self.text, ) ) else: return "SQL Server message %d, severity %d, state %d, " "line %d:\n%s" % ( self.number, self.severity, self.state, self.line, self.text, )
[docs] class ClosedConnectionError(InterfaceError): """ This error is raised when MSSQL server closes connection. """ def __init__(self): super(ClosedConnectionError, self).__init__("Server closed connection")
[docs] class DataError(Error): """ This error is raised when input parameter contains data which cannot be converted to acceptable data type. """ pass
[docs] class OperationalError(DatabaseError): """ TODO add documentation """ pass
[docs] class LoginError(OperationalError): """ This error is raised if provided login credentials are invalid """ pass
[docs] class IntegrityError(DatabaseError): """ TODO add documentation """ pass
[docs] class InternalError(DatabaseError): """ TODO add documentation """ pass
[docs] class ProgrammingError(DatabaseError): """ TODO add documentation """ pass
[docs] class NotSupportedError(DatabaseError): """ TODO add documentation """ pass
# DB-API type definitions
[docs] class DBAPITypeObject: """ TODO add documentation """ def __init__(self, *values): self.values = set(values) def __eq__(self, other): return other in self.values def __cmp__(self, other): if other in self.values: return 0 if other < self.values: return 1 else: return -1
# standard dbapi type objects STRING = DBAPITypeObject( SYBVARCHAR, SYBCHAR, SYBTEXT, XSYBNVARCHAR, XSYBNCHAR, SYBNTEXT, XSYBVARCHAR, XSYBCHAR, SYBMSXML, ) BINARY = DBAPITypeObject(SYBIMAGE, SYBBINARY, SYBVARBINARY, XSYBVARBINARY, XSYBBINARY) NUMBER = DBAPITypeObject( SYBBIT, SYBBITN, SYBINT1, SYBINT2, SYBINT4, SYBINT8, SYBINTN, SYBREAL, SYBFLT8, SYBFLTN, ) DATETIME = DBAPITypeObject(SYBDATETIME, SYBDATETIME4, SYBDATETIMN) DECIMAL = DBAPITypeObject(SYBMONEY, SYBMONEY4, SYBMONEYN, SYBNUMERIC, SYBDECIMAL) ROWID = DBAPITypeObject() # non-standard, but useful type objects INTEGER = DBAPITypeObject(SYBBIT, SYBBITN, SYBINT1, SYBINT2, SYBINT4, SYBINT8, SYBINTN) REAL = DBAPITypeObject(SYBREAL, SYBFLT8, SYBFLTN) XML = DBAPITypeObject(SYBMSXML)
[docs] class InternalProc(object): """ TODO add documentation """ def __init__(self, proc_id, name): self.proc_id = proc_id self.name = name def __unicode__(self): return self.name
SP_EXECUTESQL = InternalProc(TDS_SP_EXECUTESQL, "sp_executesql") SP_PREPARE = InternalProc(TDS_SP_PREPARE, "sp_prepare") SP_EXECUTE = InternalProc(TDS_SP_EXECUTE, "sp_execute")
[docs] def skipall(stm, size): """Skips exactly size bytes in stm If EOF is reached before size bytes are skipped will raise :class:`ClosedConnectionError` :param stm: Stream to skip bytes in, should have read method this read method can return less than requested number of bytes. :param size: Number of bytes to skip. """ res = stm.recv(size) if len(res) == size: return elif len(res) == 0: raise ClosedConnectionError() left = size - len(res) while left: buf = stm.recv(left) if len(buf) == 0: raise ClosedConnectionError() left -= len(buf)
[docs] def read_chunks(stm, size): """Reads exactly size bytes from stm and produces chunks May call stm.read multiple times until required number of bytes is read. If EOF is reached before size bytes are read will raise :class:`ClosedConnectionError` :param stm: Stream to read bytes from, should have read method, this read method can return less than requested number of bytes. :param size: Number of bytes to read. """ if size == 0: yield b"" return res = stm.recv(size) if len(res) == 0: raise ClosedConnectionError() yield res left = size - len(res) while left: buf = stm.recv(left) if len(buf) == 0: raise ClosedConnectionError() yield buf left -= len(buf)
[docs] def readall(stm, size): """Reads exactly size bytes from stm May call stm.read multiple times until required number of bytes read. If EOF is reached before size bytes are read will raise :class:`ClosedConnectionError` :param stm: Stream to read bytes from, should have read method this read method can return less than requested number of bytes. :param size: Number of bytes to read. :returns: Bytes buffer of exactly given size. """ return join_bytearrays(read_chunks(stm, size))
[docs] def readall_fast(stm, size): """ Slightly faster version of readall, it reads no more than two chunks. Meaning that it can only be used to read small data that doesn't span more that two packets. :param stm: Stream to read from, should have read method. :param size: Number of bytes to read. :return: """ buf, offset = stm.read_fast(size) if len(buf) - offset < size: # slow case buf = buf[offset:] buf += stm.recv(size - len(buf)) return buf, 0 return buf, offset
[docs] def total_seconds(td): """Total number of seconds in timedelta object Python 2.6 doesn't have total_seconds method, this function provides a backport """ return td.days * 24 * 60 * 60 + td.seconds
[docs] class Param: """ Describes typed parameter. Can be used to explicitly specify type of the parameter in the parametrized query. :param name: Optional name of the parameter :type name: str :param type: Type of the parameter, e.g. :class:`pytds.tds_types.IntType` """ def __init__(self, name: str = "", type=None, value=None, flags: int = 0): self.name = name self.type = type self.value = value self.flags = flags
[docs] class Column(CommonEqualityMixin): """ Describes table column. Can be used to define schema for bulk insert. Following flags can be used for columns in `flags` parameter: * :const:`.fNullable` - column can contain `NULL` values * :const:`.fCaseSen` - column is case-sensitive * :const:`.fReadWrite` - TODO document * :const:`.fIdentity` - TODO document * :const:`.fComputed` - TODO document :param name: Name of the column :type name: str :param type: Type of a column, e.g. :class:`pytds.tds_types.IntType` :param flags: Combination of flags for the column, multiple flags can be combined using binary or operator. Possible flags are described above. """ fNullable = 1 fCaseSen = 2 fReadWrite = 8 fIdentity = 0x10 fComputed = 0x20 def __init__(self, name="", type=None, flags=fNullable, value=None): self.char_codec = None self.column_name = name self.column_usertype = 0 self.flags = flags self.type = type self.value = value self.serializer = None def __repr__(self): val = self.value if isinstance(val, bytes) and len(self.value) > 100: val = self.value[:100] + b"... len is " + str(len(val)).encode("ascii") if isinstance(val, str) and len(self.value) > 100: val = self.value[:100] + "... len is " + str(len(val)) return ( "<Column(name={},type={},value={},flags={},user_type={},codec={})>".format( repr(self.column_name), repr(self.type), repr(val), repr(self.flags), repr(self.column_usertype), repr(self.char_codec), ) )
[docs] def choose_serializer(self, type_factory, collation): """ Chooses appropriate data type serializer for column's data type. """ return type_factory.serializer_by_type(sql_type=self.type, collation=collation)
[docs] class TransportProtocol(Protocol): """ This protocol mimics socket protocol """ # def is_connected(self) -> bool: # ... def close(self) -> None: ... def gettimeout(self) -> float | None: ... def settimeout(self, timeout: float | None) -> None: ... def sendall(self, buf: bytes, flags: int = 0) -> None: ... def recv(self, size: int) -> bytes: ... def recv_into( self, buf: bytearray | memoryview, size: int = 0, flags: int = 0 ) -> int: ...
[docs] class LoadBalancer(Protocol): def choose(self) -> Iterable[str]: ...
[docs] class AuthProtocol(Protocol): def create_packet(self) -> bytes: ... def handle_next(self, packet: bytes) -> bytes | None: ... def close(self) -> None: ...
# packet header # https://msdn.microsoft.com/en-us/library/dd340948.aspx _header = struct.Struct(">BBHHBx") _byte = struct.Struct("B") _smallint_le = struct.Struct("<h") _smallint_be = struct.Struct(">h") _usmallint_le = struct.Struct("<H") _usmallint_be = struct.Struct(">H") _int_le = struct.Struct("<l") _int_be = struct.Struct(">l") _uint_le = struct.Struct("<L") _uint_be = struct.Struct(">L") _int8_le = struct.Struct("<q") _int8_be = struct.Struct(">q") _uint8_le = struct.Struct("<Q") _uint8_be = struct.Struct(">Q") logging_enabled = False # stored procedure output parameter class output: @property def type(self): """ This is either the sql type declaration or python type instance of the parameter. """ return self._type @property def value(self): """ This is the value of the parameter. """ return self._value def __init__(self, value: Any = None, param_type=None): """Creates procedure output parameter. :param param_type: either sql type declaration or python type :param value: value to pass into procedure """ if param_type is None: if value is None or value is default: raise ValueError("Output type cannot be autodetected") elif isinstance(param_type, type) and value is not None: if value is not default and not isinstance(value, param_type): raise ValueError( "value should match param_type, value is {}, param_type is '{}'".format( repr(value), param_type.__name__ ) ) self._type = param_type self._value = value class _Default: pass default = _Default()
[docs] def tds7_crypt_pass(password: str) -> bytearray: """Mangle password according to tds rules :param password: Password str :returns: Byte-string with encoded password """ encoded = bytearray(ucs2_codec.encode(password)[0]) for i, ch in enumerate(encoded): encoded[i] = ((ch << 4) & 0xFF | (ch >> 4)) ^ 0xA5 return encoded
class _TdsLogin: def __init__(self) -> None: self.client_host_name = "" self.library = "" self.server_name = "" self.instance_name = "" self.user_name = "" self.password = "" self.app_name = "" self.port: int | None = None self.language = "" self.attach_db_file = "" self.tds_version = TDS74 self.database = "" self.bulk_copy = False self.client_lcid = 0 self.use_mars = False self.pid = 0 self.change_password = "" self.client_id = 0 self.cafile: str | None = None self.validate_host = True self.enc_login_only = False self.enc_flag = 0 self.tls_ctx = None self.client_tz: datetime.tzinfo = pytds.tz.local self.option_flag2 = 0 self.connect_timeout = 0.0 self.query_timeout: float | None = None self.blocksize = 4096 self.readonly = False self.load_balancer: LoadBalancer | None = None self.bytes_to_unicode = False self.auth: AuthProtocol | None = None self.servers: deque[Tuple[Any, int | None, str]] = deque() self.server_enc_flag = 0 class _TdsEnv: def __init__(self): self.database = None self.language = None self.charset = None self.autocommit = False # Transaction isolation level self.isolation_level = 0 def _create_exception_by_message( msg: Message, custom_error_msg: str | None = None ) -> ProgrammingError | IntegrityError | OperationalError: msg_no = msg["msgno"] if custom_error_msg is not None: error_msg = custom_error_msg else: error_msg = msg["message"] ex: ProgrammingError | IntegrityError | OperationalError if msg_no in prog_errors: ex = ProgrammingError(error_msg) elif msg_no in integrity_errors: ex = IntegrityError(error_msg) else: ex = OperationalError(error_msg) ex.msg_no = msg["msgno"] ex.text = msg["message"] ex.srvname = msg["server"] ex.procname = msg["proc_name"] ex.number = msg["msgno"] ex.severity = msg["severity"] ex.state = msg["state"] ex.line = msg["line_number"] return ex
[docs] class Message(TypedDict): marker: int msgno: int state: int severity: int sql_state: int | None priv_msg_type: int message: str server: str proc_name: str line_number: int
[docs] class Route(TypedDict): server: str port: int
class _Results(object): def __init__(self) -> None: self.columns: list[Column] = [] self.row_count = 0 self.description: tuple[tuple[str, Any, None, int, int, int, int], ...] = ()