Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix json error for field that contains unicode #107

Merged
merged 3 commits into from
Sep 11, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGES.txt
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ Changes

* Fixed legacy auth #105

* uvloop added to test suite #106


0.0.8 (2016-08-24)
^^^^^^^^^^^^^^^^^^
Expand Down
20 changes: 16 additions & 4 deletions aiomysql/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
from pymysql.constants import SERVER_STATUS
from pymysql.constants import CLIENT
from pymysql.constants import COMMAND
from pymysql.constants import FIELD_TYPE
from pymysql.util import byte2int, int2byte
from pymysql.converters import (escape_item, encoders, decoders,
escape_string, through)
Expand Down Expand Up @@ -973,6 +974,7 @@ def _get_descriptions(self):
self.fields = []
self.converters = []
use_unicode = self.connection.use_unicode
conn_encoding = self.connection.encoding
description = []
for i in range(self.field_count):
field = yield from self.connection._read_packet(
Expand All @@ -981,14 +983,24 @@ def _get_descriptions(self):
description.append(field.description())
field_type = field.type_code
if use_unicode:
if field_type in TEXT_TYPES:
charset = charset_by_id(field.charsetnr)
if charset.is_binary:
if field_type == FIELD_TYPE.JSON:
# When SELECT from JSON column: charset = binary
# When SELECT CAST(... AS JSON): charset = connection
# encoding
# This behavior is different from TEXT / BLOB.
# We should decode result by connection encoding
# regardless charsetnr.
# See https://github.com/PyMySQL/PyMySQL/issues/488
encoding = conn_encoding # SELECT CAST(... AS JSON)
elif field_type in TEXT_TYPES:
if field.charsetnr == 63: # binary
# TEXTs with charset=binary means BINARY types.
encoding = None
else:
encoding = charset.encoding
encoding = conn_encoding
else:
# Integers, Dates and Times, and other basic data
# is encoded in ascii
encoding = 'ascii'
else:
encoding = None
Expand Down
48 changes: 47 additions & 1 deletion tests/test_basic.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import asyncio
import time
import datetime
import json
import re
import time

import pytest
from pymysql import util
Expand Down Expand Up @@ -215,3 +217,47 @@ def test_rollback(connection, cursor):

# should not return any rows since no inserts was commited
assert len(data) == 0


def mysql_server_is(server_version, version_tuple):
"""Return True if the given connection is on the version given or
greater.
e.g.::
if self.mysql_server_is(conn, (5, 6, 4)):
# do something for MySQL 5.6.4 and above
"""
server_version_tuple = tuple(
(int(dig) if dig is not None else 0)
for dig in
re.match(r'(\d+)\.(\d+)\.(\d+)', server_version).group(1, 2, 3)
)
return server_version_tuple >= version_tuple


@pytest.mark.run_loop
def test_json(connection_creator, table_cleanup):
connection = yield from connection_creator(
charset="utf8mb4", autocommit=True)
server_info = connection.get_server_info()
if not mysql_server_is(server_info, (5, 7, 0)):
raise pytest.skip("JSON type is not supported on MySQL <= 5.6")

cursor = yield from connection.cursor()
yield from cursor.execute("""\
CREATE TABLE test_json (
id INT NOT NULL,
json JSON NOT NULL,
PRIMARY KEY (id)
);""")
table_cleanup("test_json")
json_str = '{"hello": "こんにちは"}'
yield from cursor.execute(
"INSERT INTO test_json (id, `json`) values (42, %s)", (json_str,))
yield from cursor.execute("SELECT `json` from `test_json` WHERE `id`=42")

r = yield from cursor.fetchone()
assert json.loads(r[0]) == json.loads(json_str)

yield from cursor.execute("SELECT CAST(%s AS JSON) AS x", (json_str,))
r = yield from cursor.fetchone()
assert json.loads(r[0]) == json.loads(json_str)