mirror of
https://gitlab.isc.org/isc-projects/kea
synced 2025-08-30 21:45:37 +00:00
make a difference between binary and string data. This is mostly because strings are utf-8, while binary is, well, binary.
git-svn-id: svn://bind10.isc.org/svn/bind10/branches/parkinglot@307 e5f2f494-b856-4b98-b285-d166d9295462
This commit is contained in:
@@ -23,10 +23,11 @@ class DecodeError(Exception): pass
|
||||
|
||||
PROTOCOL_VERSION = 0x536b616e
|
||||
|
||||
_ITEM_DATA = 0x01
|
||||
_ITEM_BLOB = 0x01
|
||||
_ITEM_HASH = 0x02
|
||||
_ITEM_LIST = 0x03
|
||||
_ITEM_NULL = 0x04
|
||||
_ITEM_UTF8 = 0x08
|
||||
_ITEM_MASK = 0x0f
|
||||
|
||||
_ITEM_LENGTH_32 = 0x00
|
||||
@@ -44,7 +45,8 @@ def _encode_tag(tag):
|
||||
"""Encode a single UTF-8 tag.
|
||||
... wire_partial = Message._encode_tag('this')
|
||||
"""
|
||||
return(struct.pack(">B", len(bytearray(tag, 'utf-8'))) + bytearray(tag, 'utf-8'))
|
||||
binary = bytes(tag, 'utf-8')
|
||||
return(struct.pack(">B", len(binary))) + binary
|
||||
|
||||
def _encode_length_and_type(data, datatype):
|
||||
"""Helper method to handle the length encoding in one place."""
|
||||
@@ -58,9 +60,13 @@ def _encode_length_and_type(data, datatype):
|
||||
else:
|
||||
return(struct.pack(">B I", datatype, length) + data)
|
||||
|
||||
def _pack_string(item):
|
||||
"""Pack a string (data) and its type/length prefix."""
|
||||
return (_encode_length_and_type(bytearray(item, 'utf-8'), _ITEM_DATA))
|
||||
def _pack_utf8(item):
|
||||
"""Pack a string (utf-8) and its type/length prefix."""
|
||||
return (_encode_length_and_type(bytes(item, 'utf-8'), _ITEM_UTF8))
|
||||
|
||||
def _pack_blob(item):
|
||||
"""Pack a blob (binary data) and its type/length prefix."""
|
||||
return (_encode_length_and_type(item, _ITEM_BLOB))
|
||||
|
||||
def _pack_array(item):
|
||||
"""Pack a list (array) and its type/length prefix."""
|
||||
@@ -71,8 +77,12 @@ def _pack_hash(item):
|
||||
data = _encode_hash(item)
|
||||
return (_encode_length_and_type(data, _ITEM_HASH))
|
||||
|
||||
def _encode_string(item):
|
||||
"""Encode a string. More or less identity."""
|
||||
def _encode_utf8(item):
|
||||
"""Encode a string (utf-8). More or less identity."""
|
||||
return (item)
|
||||
|
||||
def _encode_blob(item):
|
||||
"""Encode a blob (data). More or less identity."""
|
||||
return (item)
|
||||
|
||||
def _pack_nil():
|
||||
@@ -87,22 +97,21 @@ def _encode_item(item):
|
||||
return (_pack_hash(item))
|
||||
elif type(item) == list:
|
||||
return (_pack_array(item))
|
||||
elif type(item) in (bytearray, bytes):
|
||||
return (_pack_string(item.decode()))
|
||||
elif type(item) in (bytes, bytearray):
|
||||
return (_pack_blob(item))
|
||||
else:
|
||||
return (_pack_string(str(item)))
|
||||
return (_pack_utf8(str(item)))
|
||||
|
||||
def _encode_array(item):
|
||||
"""Encode an array, where each value is encoded recursively"""
|
||||
ret = bytearray()
|
||||
ret = bytes()
|
||||
for i in item:
|
||||
ret += _encode_item(i)
|
||||
return ret
|
||||
|
||||
def _encode_hash(item):
|
||||
"""Encode a hash, where each value is encoded recursively"""
|
||||
|
||||
ret = bytearray()
|
||||
ret = bytes()
|
||||
for key, value in item.items():
|
||||
ret += _encode_tag(key)
|
||||
ret += _encode_item(value)
|
||||
@@ -159,8 +168,10 @@ def _decode_item(data):
|
||||
item = data[0:length]
|
||||
data = data[length:]
|
||||
|
||||
if item_type == _ITEM_DATA:
|
||||
value = item.decode()
|
||||
if item_type == _ITEM_BLOB:
|
||||
value = item
|
||||
elif item_type == _ITEM_UTF8:
|
||||
value = str(item, 'utf-8')
|
||||
elif item_type == _ITEM_HASH:
|
||||
value = _decode_hash(item)
|
||||
elif item_type == _ITEM_LIST:
|
||||
|
@@ -6,28 +6,37 @@ class TestCCWireEncoding(unittest.TestCase):
|
||||
|
||||
def test_to_wire_of_string(self):
|
||||
wire = ISC.CC.Message.to_wire({ "simple" : "string" })
|
||||
self.assertEqual(wire, b'Skan\x06simple!\x06string')
|
||||
self.assertEqual(wire, b'Skan\x06simple(\x06string')
|
||||
|
||||
def test_from_wire_of_string(self):
|
||||
wire = b'Skan\x06simple!\x06string'
|
||||
wire = b'Skan\x06simple(\x06string'
|
||||
decoded = ISC.CC.Message.from_wire(wire)
|
||||
self.assertEqual(decoded["simple"], "string")
|
||||
|
||||
def test_to_wire_of_binary_string(self):
|
||||
wire = ISC.CC.Message.to_wire({ "simple" : b'\x01\xff\x02\x85' })
|
||||
self.assertEqual(wire, b'Skan\x06simple!\x04\x01\xff\x02\x85')
|
||||
|
||||
def test_from_wire_of_binary_string(self):
|
||||
wire = b'Skan\x06simple!\x04\x01\xff\x02\x85'
|
||||
decoded = ISC.CC.Message.from_wire(wire)
|
||||
self.assertEqual(decoded["simple"], b'\x01\xff\x02\x85')
|
||||
|
||||
def test_to_wire_of_list(self):
|
||||
wire = ISC.CC.Message.to_wire({ "simple" : [ "string" ] })
|
||||
self.assertEqual(wire, b'Skan\x06simple#\x08!\x06string')
|
||||
self.assertEqual(wire, b'Skan\x06simple#\x08(\x06string')
|
||||
|
||||
def test_from_wire_of_list(self):
|
||||
wire = b'Skan\x06simple#\x08!\x06string'
|
||||
wire = b'Skan\x06simple#\x08(\x06string'
|
||||
decoded = ISC.CC.Message.from_wire(wire)
|
||||
self.assertEqual(decoded["simple"], [ "string" ])
|
||||
|
||||
def test_to_wire_of_hash(self):
|
||||
wire = ISC.CC.Message.to_wire({ "simple" : { "string" : 1 }})
|
||||
self.assertEqual(wire, b'Skan\x06simple"\n\x06string!\x011')
|
||||
self.assertEqual(wire, b'Skan\x06simple"\n\x06string(\x011')
|
||||
|
||||
def test_from_wire_of_hash(self):
|
||||
wire = b'Skan\x06simple"\n\x06string!\x011'
|
||||
wire = b'Skan\x06simple"\n\x06string(\x011'
|
||||
decoded = ISC.CC.Message.from_wire(wire)
|
||||
self.assertEqual(decoded["simple"], { "string" : '1' })
|
||||
|
||||
@@ -42,28 +51,28 @@ class TestCCWireEncoding(unittest.TestCase):
|
||||
|
||||
def test_to_wire_of_empty_string(self):
|
||||
wire = ISC.CC.Message.to_wire({ "simple" : "" })
|
||||
self.assertEqual(wire, b'Skan\x06simple!\x00')
|
||||
self.assertEqual(wire, b'Skan\x06simple(\x00')
|
||||
|
||||
def test_from_wire_of_empty_string(self):
|
||||
wire = b'Skan\x06simple!\x00'
|
||||
wire = b'Skan\x06simple(\x00'
|
||||
decoded = ISC.CC.Message.from_wire(wire)
|
||||
self.assertEqual(decoded["simple"], "")
|
||||
|
||||
def test_to_wire_of_utf8_string(self):
|
||||
wire = ISC.CC.Message.to_wire({ "simple" : "せんせい" })
|
||||
self.assertEqual(wire, b'Skan\x06simple!\x0c\xe3\x81\x9b\xe3\x82\x93\xe3\x81\x9b\xe3\x81\x84')
|
||||
self.assertEqual(wire, b'Skan\x06simple(\x0c\xe3\x81\x9b\xe3\x82\x93\xe3\x81\x9b\xe3\x81\x84')
|
||||
|
||||
def test_from_wire_of_utf8_string(self):
|
||||
wire = b'Skan\x06simple!\x0c\xe3\x81\x9b\xe3\x82\x93\xe3\x81\x9b\xe3\x81\x84'
|
||||
wire = b'Skan\x06simple(\x0c\xe3\x81\x9b\xe3\x82\x93\xe3\x81\x9b\xe3\x81\x84'
|
||||
decoded = ISC.CC.Message.from_wire(wire)
|
||||
self.assertEqual(decoded["simple"], "せんせい")
|
||||
|
||||
def test_to_wire_of_utf8_label(self):
|
||||
wire = ISC.CC.Message.to_wire({ "せんせい" : "string" })
|
||||
self.assertEqual(wire, b'Skan\x0c\xe3\x81\x9b\xe3\x82\x93\xe3\x81\x9b\xe3\x81\x84!\x06string')
|
||||
self.assertEqual(wire, b'Skan\x0c\xe3\x81\x9b\xe3\x82\x93\xe3\x81\x9b\xe3\x81\x84(\x06string')
|
||||
|
||||
def test_from_wire_of_utf8_label(self):
|
||||
wire = b'Skan\x0c\xe3\x81\x9b\xe3\x82\x93\xe3\x81\x9b\xe3\x81\x84!\x06string'
|
||||
wire = b'Skan\x0c\xe3\x81\x9b\xe3\x82\x93\xe3\x81\x9b\xe3\x81\x84(\x06string'
|
||||
decoded = ISC.CC.Message.from_wire(wire)
|
||||
self.assertEqual(decoded["せんせい"], "string")
|
||||
|
||||
|
Reference in New Issue
Block a user