rlp序列化文档
RLP(Recursive Length Prefix,递归的长度前缀)是一种编码规则,可用于编码任意嵌套的二进制数据,特定的数据类型(例如:string,int等类型)则交给更高阶的协议处理。需要注意的是:int类型必须去掉首部0,且必须用大端模式表示。
编解码规则
- 编码规则
- 如果数据的长度是1个字节,并且它的值在
[0x00, 0x7f]
范围之间,那么其RLP编码就是数据本身。即前缀为空,用前缀代表数据本身; - 如果数据的长度是0-55字节,其RLP编码是前缀跟上(拼接)数据本身,前缀的值是
0x80
加上数据的长度。由于在该规则下,数据的最大长度是55,因此前缀的最大值是0x80+55=0xb7
,所以在本规则下前缀(第一个字节)的取值范围是[0x80, 0xb7]
; - 如果数据的长度大于55个字节,其RLP编码是前缀跟上数据的长度再跟上数据本身。前缀的值是0xb7加上数据长度的二进制形式的字节长度(即字符串长度的存储长度)。即用额外的空间存储字符串的长度,而前缀中只存数据的长度的长度。例如一个长度是1024的数据,数据长度的二进制形式是
0x04,0x00
,因此数据长度的长度是2个字节,所以前缀应该是0xb7+2=0xb9
,由此得到该数据的RLP编码是0xb9,0x04,0x00
再跟上数据本身。因为数据长度的长度最少需要1个字节存储,因此前缀的最小值是0xb7+1=0xb8
;又由于长度的最大值是8个字节,因此前缀的最大值是0xb7+8=0xbf
,因此在本规则下前缀的取值范围是[0xb8, 0xbf]
; - 以上3个规则是针对不需要递归的数据(例如string或int),接下来的两个规则是针对可以递归的数据,以下用列表泛指。由于列表是任意嵌套的,因此列表的编码是递归的,先编码最里层列表,再逐步往外层列表编码。如果一个列表的总长度(payload,列表的所有项经过编码后拼接在一起的字节大小)是0-55字节,其RLP编码是前缀依次跟上列表中各项的RLP编码。前缀的值是
0xc0
加上列表的总长度。在本规则下前缀的取值范围是[0xc0, 0xf7]
。本规则与规则2类似; - 如果一个列表的总长度大于55字节,它的RLP编码是前缀跟上列表的长度再依次跟上列表中各元素项的RLP编码。前缀的值是0xf7加上列表总长度的长度。编码的第一个字节的取值范围是
[0xf8, 0xff]
。本规则与规则3类似; - 在以上的五种规则中,有几个问题存在
- 满足规则1数据长度为1时,同样满足规则2,那么为什么规则2不设定为
2-55
个字节呢?当字节数为0的时候,我们不应该什么都不返回,而是应该返回一个代表数据类型的前缀,用来表示数据是否是可以嵌套的数据类型,当传入一个空的字符串时,我们返回0x80
;当传入一个空的列表时,返回0xc0
;这样设定就避免了为空的数据的无效编码,那么规则1与规则2重叠时怎么办呢?以规则1优先。
- 满足规则1数据长度为1时,同样满足规则2,那么为什么规则2不设定为
- 如果数据的长度是1个字节,并且它的值在
- 编码结果示例
- 整数
0 = [0x80]
- 整数
1 = [0x01]
- 整数
1024('0x04,0x00') = [0x82, 0x04, 0x00]
- 字符串
"" = ['0x80']
- 字符串
"d" = ['d']
- 字符串
"dog" = [0x83, 'd', 'o', 'g']
- 56位字符串
"Lorem ipsum dolor sit amet, consectetur adipisicing elit" = [ 0xb8, 0x38, 'L', 'o', 'r', 'e', 'm', ' ', ... , 'e', 'l', 'i', 't' ]
- 列表
[] = [0xc0]
- 列表
["cat","dog"] = [0xc8, 0x83, 'c', 'a', 't', 0x83, 'd', 'o', 'g']
- 嵌套列表
[ [], [[]], [ [], [[]] ] ] = [0xc7, 0xc0, 0xc1, 0xc0, 0xc3, 0xc0, 0xc1, 0xc0]
- 大列表
["cat", "Lorem ipsum dolor sit amet, consectetur adipisicing elit"] = [0xf8, 0x3e, 0x83, 'c', 'a', 't', 0xb8, 0x38, 'L', 'o', 'r', ..., 't']
- 整数
- 解码规则:根据以上编码结果的实例,可以分析得出编码结果分为三个部分:
{offset, [size], [data]}
。Offset
在五个规则中分别占用了127,56,8,56,8
个值,正好分配了一个字节所能代表的所有值,这样拿到一个RLP编码数组后,用第一个字节就能分辨出不同的类型,交给不同的逻辑去处理。- 当
offset<=127
时,数据不需要解码,直接返回结果; - 当
127<offset<=183
时,数据为不需要递归解码的简单类型,用前缀减去前缀基础值就能得到编码长度size
,把编码数组中从下标1开始共size
个字节返回即可; - 当
183<offset<=192
时,数据为不需要递归的简单类型,用前缀减去前缀基础值得到保存编码长度的字节个数sizeBuf
,获得编码数组中从下标1开始的sizeBuf
个字节,这些字节就是数据长度size
,然后从编码数组中从1+sizeBuf
下标开始的size
个字节,这些字节就是得到的数据; - 当
192<offset<=247
时,数据为包含嵌套元素的复杂类型,用127<offset<=183
的规则获得所有子元素的数组后,数组中的第一个字节为第一个嵌套元素的前缀,根据以上三种逻辑处理即可; - 当
248<offset<=255
时,数据同样是包含嵌套元素的复杂类型,用183<offset<=192
的规则获得所有子元素的数组后,数组中的第一个字节为第一个子元素的前缀,根据前三种逻辑处理即可。
- 当
实现RLP编解码
GoLang实现RLP编码
上文中我们总结出,所有的编码结果都可以分为三个部分,代表数据类型的前缀基础值offset
与代表数据长度或者数据长度的字节数size
相加得到的前缀;代表数据长度的字节数组sizeBuf
;代表数据的数组data
;最后我们需要添加一个err
属性用来返回编码过程中发生的错误。使用这样的结构可以把所有类型的编码方法都设为相同的返回值,这样便于递归数据的处理。以下为代码实现:
type Encoder struct {
Offset Offset
Size uint
SizeBuf []byte
Data []byte
err error
}
在上面的结构中,我们设置了Offset
类型,这是为了方便使用,同时也为了让代码可读性更好,下面是实现:
type Offset byte
const (
// 当需要编码的值字节长度为1,且它的值在[0x00, 0x7f](0-127)范围之间,不需要编码前缀,此时offset为0
Zero Offset = 0x00
// 当字节长度为0-55个字节且不符合第一条规则
StrLow = 0x80
// 当字节长度大于55
StrHigh = 0xb7
// 列表中的所有元素经过编码后拼接的字节长度为0-55个字节
ListLow = 0xc0
// 拼接后字节长度大于55个字节
ListHigh = 0xf7
)
同时我们为将Encode->[]byte
这一个步骤规范化,添加一个将结构体转为字节数组的内部方法:
func (e *Encoder) GetBytes() (bytes []byte, err error) {
if e.err != nil {
return bytes, e.err
}
if e.Offset != Zero {
bytes = append(bytes, byte(e.Offset)+byte(e.Size))
}
bytes = append(bytes, e.SizeBuf...)
bytes = append(bytes, e.Data...)
return
}
在代码中我们可能需要频繁的为Encoder.Data
添加数据,我们设置了一个方法来处理:
func (e *Encoder) AppendData(bytes ...byte) {
e.Data = append(e.Data, bytes...)
}
这样一个编码通用的结构就准备好了,我们现在需要一个用来完成编码的函数入口:
func Encode(object interface{}) ([]byte, error) {
e := encode(object)
return e.GetBytes()
}
一个根据类型分发的开关语句:
// pow recursive calls increase
func encode(object interface{}) (e Encoder) {
var bigIntType = reflect.TypeOf(big.Int{})
typ := reflect.TypeOf(object)
val := reflect.ValueOf(object)
kind := typ.Kind()
switch {
case kind == reflect.Interface:
e = encodeInterface(val)
case typ.AssignableTo(reflect.PtrTo(bigIntType)):
e = encodeBigIntPtr(val)
case typ.AssignableTo(bigIntType):
e = encodeBigIntNoPtr(val)
case kind >= reflect.Uint && kind <= reflect.Uintptr:
e = encodeUint(val.Uint())
case kind == reflect.Bool:
e = encodeBool(val.Bool())
case kind == reflect.String:
e = encodeString(val.String())
case kind == reflect.Slice && isByte(typ.Elem()):
e = encodeBytes(val.Bytes())
case kind == reflect.Array && isByte(typ.Elem()):
e = encodeByteArray(val)
case kind == reflect.Slice || kind == reflect.Array:
e = encodeSlice(val)
case kind == reflect.Struct:
e = encodeStruct(val)
case kind == reflect.Ptr:
e = encodePointer(val)
default:
e.err = fmt.Errorf("rlp: unsupported type: %s, kind: %s", typ, kind)
}
return
}
func isByte(typ reflect.Type) bool {
return typ.Kind() == reflect.Uint8
}
因为实现的类型有很多,我就只粘贴基本类型和嵌套类型的两种实现:
// 基本类型
func encodeBytes(b []byte) (e Encoder) {
length := len(b)
if length == 1 && b[0] <= 0x7f {
e.Offset = Zero
} else if length < 56 {
e.Offset = StrLow
e.Size = uint(length)
} else {
e.Offset = StrHigh
// 获得一个uint值的二进制字节数组和这个数组的长度
data, size := uintByteSize(uint64(length))
e.Size = size
e.SizeBuf = data
}
e.AppendData(b...)
return
}
// 嵌套类型
func encodeSlice(val reflect.Value) (e Encoder) {
vLen := val.Len()
for i := 0; i < vLen; i++ {
// if not called Interface(), reflect.TypeOf(elem).Kind() always be struct
elem := val.Index(i).Interface()
elemEncoder := encode(elem)
data, err := elemEncoder.GetBytes()
if err != nil {
e.err = err
return
}
e.AppendData(data...)
}
e.Size = uint(len(e.Data))
if e.Size < 56 {
e.Offset = ListLow
} else {
e.Offset = ListHigh
e.SizeBuf, e.Size = uintByteSize(uint64(e.Size))
}
return
}
GoLang实现RLP解码
其实解码的实现与编码的逻辑大致相同,不同的是我们需要接口调用者传递的空容器的类型来进行编码,例如:编码数组[0x80]
作为基本类型的空值,当调用者传给我们的容器类型为string
时,我们应该返回一个空串;如果容器类型是uint
,我们应该返回0;如果是一个bool
,我们应该返回false。以下为Decode
接口:
func Decode(data []byte, container interface{}) error {
if container == nil {
return errDecodeIntoNil
}
if data == nil || len(data) == 0 {
return errDecodeDataNil
}
d, err := MakeDeCoder(data)
if err != nil {
return err
}
return decode(d, container)
}
以下为结构体的实现:
type Kind int
const (
Byte Kind = iota
String
List
)
type Decoder struct {
Kind Kind
Data []byte
Size uint64
SizeBuf []byte
}
我们在得到数据后就应该把数据转为结构体:
func MakeDeCoder(data []byte) (Decoder, error) {
d := Decoder{}
length := len(data)
if data == nil || length == 0 {
return d, errDecodeDataNil
}
if length == 1 && data[0] <= 0x7f {
d.Kind = Byte
d.Data = data
d.Size = 1
} else if length == 1 {
d.Kind = String
} else {
kind, size, bitSize, err := getKind(data)
if err != nil {
return d, err
}
bitLastIndex := 1 + bitSize
d.Kind = kind
if bitLastIndex > 1 {
d.SizeBuf = data[1:bitLastIndex]
}
d.Data = data[bitLastIndex:]
d.Size = size
}
return d, nil
}
根据编码规则我们需要实现一个可以分辨数据类型的函数getKind()
:
func getKind(data []byte) (kind Kind, size uint64, bitSize uint8, err error) {
offset := data[0]
switch {
case offset < StrLow:
kind = Byte
size = 1
case offset <= StrHigh:
kind = String
size = uint64(offset - StrLow)
case offset < ListLow:
kind = String
bitSize = uint8(offset - StrHigh)
size = bitToUint(data[1:bitSize+1])
if size < 56 {
err = ErrCanonSize
}
case offset <= ListHigh:
kind = List
size = uint64(offset - ListLow)
default:
kind = List
bitSize = uint8(offset - ListHigh)
size = bitToUint(data[1:bitSize+1])
if size < 56 {
err = ErrCanonSize
}
}
return
}
func bitToUint(bytes []byte) uint64 {
bitsLen := 8 - len(bytes)
zeroBits := make([]byte, bitsLen, bitsLen)
bits := append(zeroBits, bytes...)
return binary.BigEndian.Uint64(bits)
}
分辨容器类型的开关语句基本和解码类似,我就不浪费篇幅贴出来了。将解码结果赋值给容器需要用到GoLang的反射,这里给出基本类型和嵌套类型的两种实现:
// 基本类型
func decodeUint(decoder Decoder, val reflect.Value) error {
if err := decoder.ValidBytes(64); err != nil {
return err
}
i := bitToUint(decoder.Data)
val.SetUint(i)
return nil
}
// 嵌套类型
func decodeStruct(decoder Decoder, val reflect.Value) (err error) {
numField := val.NumField()
for i, dataIndex := 0, 0; i < numField; i++ {
dataIndex, err = decodeListElem(decoder.Data, dataIndex, val.Field(i))
if err != nil {
break
}
}
return
}
func decodeListElem(data []byte, cursor int, elem reflect.Value) (int, error) {
kind, size, bitSize, err := getKind(data[cursor:])
if err != nil {
return 0, err
}
var elemSize uint64
switch {
case kind == Byte || (kind == String && size == 0):
elemSize = 1 + uint64(cursor)
default:
elemSize = 1 + uint64(bitSize) + size + uint64(cursor)
}
elemSlice := data[cursor:elemSize]
decoder, err := MakeDeCoder(elemSlice)
if err != nil {
return 0, err
}
cursor = int(elemSize)
return cursor, decodeValue(decoder, elem)
}
编码只需要把拿到的数据按照规则转为字节数组返回就可以,但解码却需要验证得到的数据是否是正确的RLP编码,同时还要验证编码类型与容器类型是否匹配,为此我们专门设置了一些error
来表示:
var (
ErrExpectedString = errors.New("rlp: expected String or Byte")
ErrCanonSize = errors.New("rlp: non-canonical size information")
ErrCanonZero = errors.New("rlp: non-canonical offset is zero")
ErrCanonOffset = errors.New("rlp: container type does not match coding")
errDecodeIntoNil = errors.New("rlp: pointer given to Decode must not be nil")
errDecodeDataNil = errors.New("rlp: decode bytes must not be nil")
errUintOverflow = errors.New("rlp: uint overflow")
errNoPointer = errors.New("rlp: interface given to Decode must be a pointer")
)
以上,RLP编解码的代码实现就介绍完了。
延伸:
- rlp的优势与缺点?
答:RLP的优势在于编码规则简洁优雅,只针对字节数据编码的规则不会受到上层协议复杂的业务场景的影响;不同于JSON或者XML等序列化方法,需要用到大量的分隔符填充到数据中导致数据体量增大,RLP中只使用一个字节的编码前缀就完全可以区分不同的编码结构,极大程度的减小了序列化后的数据体量。相应的缺点就是可读性差,如果是对字节等知识不了解的人,可能一点都看不懂;同样的,因为RLP的编码协议偏底层,开发人员可能也需要较多的精力去学习才能完全掌握;另外因为编码前缀的设定,RLP对数据的长度是有限制的,代表数据长度的值不能大于8个字节。
- unicode是否支持?
答:RLP编码规则本身是对二进制字节数据的编码,并不会影响到字符集的处理,字符集的处理将由上级协议完成。也就是说,当你用java的默认字符集ISO将一个string转为byte数组a,RLP会根据byte[]处理,然后将编码结果传递给GoLang,在GoLang中将收到的数据解码,此时解码的结果与byte数组a是完全相同的,但是因为GoLang只支持UTF-8字符集,那么你得到的当然会是乱码,以上情况RLP是无能为力的。所以在这里我建议所有实现RLP的客户端统一使用UTF-8字符集。
- 不同字节序大端BE(Big Endian),小端LE(Little Endian)是否兼容?
答:不兼容。与上诉问题2类似,RLP编码只是对二进制字节数据的编码,字节序也应该交给上级协议完成。在客户端a中,我使用BE作为RLP实现规则,将整数
1024
编码为[0x82, 0x04, 0x00]
),将编码结果传输给客户端b后,我使用LE解码收到的数据,解码结果为4
。以上因为字节序的不同导致编解码前后结果不同的问题,RLP协议本身是无法处理的,这里就需要上级协议的规范化来保证编解码的正确性。考虑到BE现在受众更广泛,所以代码中实现了BE,关键代码在解码实现的bitToUint()函数最后一行。
有疑问加站长微信联系(非本文作者)