# -*- coding: utf-8 -*- """ Device Tree Blob Parser Copyright 2014 Neil 'superna' Armstrong Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. @author: Neil 'superna' Armstrong """ from __future__ import print_function from __future__ import absolute_import import string import os import json from copy import deepcopy, copy from struct import Struct, unpack, pack FDT_MAGIC = 0xd00dfeed FDT_BEGIN_NODE = 0x1 FDT_END_NODE = 0x2 FDT_PROP = 0x3 FDT_NOP = 0x4 FDT_END = 0x9 INDENT = ' ' * 4 FDT_MAX_VERSION = 17 class FdtProperty(object): """ Represents an empty property""" @staticmethod def __validate_dt_name(name): """Checks the name validity""" return not any([True for char in name if char not in string.printable]) def __init__(self, name): """Init with name""" self.name = name if not FdtProperty.__validate_dt_name(self.name): raise Exception("Invalid name '%s'" % self.name) def get_name(self): """Get property name""" return self.name def __str__(self): """String representation""" return "Property(%s)" % self.name def dts_represent(self, depth=0): """Get dts string representation""" return INDENT*depth + self.name + ';' def dtb_represent(self, string_store, pos=0, version=17): """Get blob representation""" # print "%x:%s" % (pos, self) strpos = string_store.find(self.name+'\0') if strpos < 0: strpos = len(string_store) string_store += self.name+'\0' pos += 12 return (pack('>III', FDT_PROP, 0, strpos), string_store, pos) def json_represent(self, depth=0): """Ouput JSON""" return '%s: null' % json.dumps(self.name) def to_raw(self): """Return RAW value representation""" return '' def __getitem__(self, value): """Returns No Items""" return None def __ne__(self, node): """Check property inequality """ return not self.__eq__(node) def __eq__(self, node): """Check node equality check properties are the same (same values) """ if not isinstance(node, FdtProperty): raise Exception("Invalid object type") if self.name != node.get_name(): return False return True @staticmethod def __check_prop_strings(value): """Check property string validity Python version of util_is_printable_string from dtc """ pos = 0 posi = 0 end = len(value) if not len(value): return None #Needed for python 3 support: If a bytes object is passed, #decode it with the ascii codec. If the decoding fails, assume #it was not a string object. try: value = value.decode('ascii') except ValueError: return None #Test both against string 0 and int 0 because of # python2/3 compatibility if value[-1] != '\0': return None while pos < end: posi = pos while pos < end and value[pos] != '\0' \ and value[pos] in string.printable \ and value[pos] not in ('\r', '\n'): pos += 1 if value[pos] != '\0' or pos == posi: return None pos += 1 return True @staticmethod def new_raw_property(name, raw_value): """Instantiate property with raw value type""" if FdtProperty.__check_prop_strings(raw_value): return FdtPropertyStrings.init_raw(name, raw_value) elif len(raw_value) and len(raw_value) % 4 == 0: return FdtPropertyWords.init_raw(name, raw_value) elif len(raw_value) and len(raw_value): return FdtPropertyBytes.init_raw(name, raw_value) else: return FdtProperty(name) class FdtPropertyStrings(FdtProperty): """Property with strings as value""" @classmethod def __extract_prop_strings(cls, value): """Extract strings from raw_value""" return [st for st in \ value.decode('ascii').split('\0') if len(st)] def __init__(self, name, strings): """Init with strings""" FdtProperty.__init__(self, name) if not strings: raise Exception("Invalid strings") for stri in strings: if len(stri) == 0: raise Exception("Invalid strings") if any([True for char in stri if char not in string.printable or char in ('\r', '\n')]): raise Exception("Invalid chars in strings") self.strings = strings @classmethod def init_raw(cls, name, raw_value): """Init from raw""" return cls(name, cls.__extract_prop_strings(raw_value)) def dts_represent(self, depth=0): """Get dts string representation""" return INDENT*depth + self.name + ' = "' + \ '", "'.join(self.strings) + '";' def dtb_represent(self, string_store, pos=0, version=17): """Get blob representation""" # print "%x:%s" % (pos, self) blob = pack('') for chars in self.strings: blob += chars.encode('ascii') + pack('b', 0) blob_len = len(blob) if version < 16 and (pos+12) % 8 != 0: blob = pack('b', 0) * (8-((pos+12) % 8)) + blob if blob_len % 4: blob += pack('b', 0) * (4-(blob_len % 4)) strpos = string_store.find(self.name+'\0') if strpos < 0: strpos = len(string_store) string_store += self.name+'\0' blob = pack('>III', FDT_PROP, blob_len, strpos) + blob pos += len(blob) return (blob, string_store, pos) def json_represent(self, depth=0): """Ouput JSON""" result = '%s: ["strings", ' % json.dumps(self.name) result += ', '.join([json.dumps(stri) for stri in self.strings]) result += ']' return result def to_raw(self): """Return RAW value representation""" return ''.join([chars+'\0' for chars in self.strings]) def __str__(self): """String representation""" return "Property(%s,Strings:%s)" % (self.name, self.strings) def __getitem__(self, index): """Get strings, returns a string""" return self.strings[index] def __len__(self): """Get strings count""" return len(self.strings) def __eq__(self, node): """Check node equality check properties are the same (same values) """ if not FdtProperty.__eq__(self, node): return False if self.__len__() != len(node): return False for index in range(self.__len__()): if self.strings[index] != node[index]: return False return True class FdtPropertyWords(FdtProperty): """Property with words as value""" def __init__(self, name, words): """Init with words""" FdtProperty.__init__(self, name) for word in words: if not 0 <= word <= 4294967295: raise Exception(("Invalid word value %d, requires " + "0 <= number <= 4294967295") % word) if not len(words): raise Exception("Invalid Words") self.words = words @classmethod def init_raw(cls, name, raw_value): """Init from raw""" if len(raw_value) % 4 == 0: words = [unpack(">I", raw_value[i:i+4])[0] for i in range(0, len(raw_value), 4)] return cls(name, words) else: raise Exception("Invalid raw Words") def dts_represent(self, depth=0): """Get dts string representation""" return INDENT*depth + self.name + ' = <' + \ ' '.join(["0x%08x" % word for word in self.words]) + ">;" def dtb_represent(self, string_store, pos=0, version=17): """Get blob representation""" # # print "%x:%s" % (pos, self) strpos = string_store.find(self.name+'\0') if strpos < 0: strpos = len(string_store) string_store += self.name+'\0' blob = pack('>III', FDT_PROP, len(self.words)*4, strpos) + \ pack('').join([pack('>I', word) for word in self.words]) pos += len(blob) return (blob, string_store, pos) def json_represent(self, depth=0): """Ouput JSON""" result = '%s: ["words", "' % json.dumps(self.name) result += '", "'.join(["0x%08x" % word for word in self.words]) result += '"]' return result def to_raw(self): """Return RAW value representation""" return ''.join([pack('>I', word) for word in self.words]) def __str__(self): """String representation""" return "Property(%s,Words:%s)" % (self.name, self.words) def __getitem__(self, index): """Get words, returns a word integer""" return self.words[index] def __len__(self): """Get words count""" return len(self.words) def __eq__(self, node): """Check node equality check properties are the same (same values) """ if not FdtProperty.__eq__(self, node): return False if self.__len__() != len(node): return False for index in range(self.__len__()): if self.words[index] != node[index]: return False return True class FdtPropertyBytes(FdtProperty): """Property with signed bytes as value""" def __init__(self, name, bytez): """Init with bytes""" FdtProperty.__init__(self, name) for byte in bytez: if not -128 <= byte <= 127: raise Exception(("Invalid value for byte %d, " + "requires -128 <= number <= 127") % byte) if not bytez: raise Exception("Invalid Bytes") self.bytes = bytez @classmethod def init_raw(cls, name, raw_value): """Init from raw""" return cls(name, unpack('b' * len(raw_value), raw_value)) def dts_represent(self, depth=0): """Get dts string representation""" return INDENT*depth + self.name + ' = [' + \ ' '.join(["%02x" % (byte & int('ffffffff',16)) for byte in self.bytes]) + "];" def dtb_represent(self, string_store, pos=0, version=17): """Get blob representation""" # print "%x:%s" % (pos, self) strpos = string_store.find(self.name+'\0') if strpos < 0: strpos = len(string_store) string_store += self.name+'\0' blob = pack('>III', FDT_PROP, len(self.bytes), strpos) blob += pack('').join([pack('>b', byte) for byte in self.bytes]) if len(blob) % 4: blob += pack('b', 0) * (4-(len(blob) % 4)) pos += len(blob) return (blob, string_store, pos) def json_represent(self, depth=0): """Ouput JSON""" result = '%s: ["bytes", "' % json.dumps(self.name) result += '", "'.join(["%02x" % byte for byte in self.bytes]) result += '"]' return result def to_raw(self): """Return RAW value representation""" return ''.join([pack('>b', byte) for byte in self.bytes]) def __str__(self): """String representation""" return "Property(%s,Bytes:%s)" % (self.name, self.bytes) def __getitem__(self, index): """Get bytes, returns a byte""" return self.bytes[index] def __len__(self): """Get strings count""" return len(self.bytes) def __eq__(self, node): """Check node equality check properties are the same (same values) """ if not FdtProperty.__eq__(self, node): return False if self.__len__() != len(node): return False for index in range(self.__len__()): if self.bytes[index] != node[index]: return False return True class FdtNop(object): # pylint: disable-msg=R0903 """Nop child representation""" def __init__(self): """Init with nothing""" def get_name(self): # pylint: disable-msg=R0201 """Return name""" return None def __str__(self): """String representation""" return '' def dts_represent(self, depth=0): # pylint: disable-msg=R0201 """Get dts string representation""" return INDENT*depth+'// [NOP]' def dtb_represent(self, string_store, pos=0, version=17): """Get blob representation""" # print "%x:%s" % (pos, self) pos += 4 return (pack('>I', FDT_NOP), string_store, pos) class FdtNode(object): """Node representation""" @staticmethod def __validate_dt_name(name): """Checks the name validity""" return not any([True for char in name if char not in string.printable]) def __init__(self, name): """Init node with name""" self.name = name self.subdata = [] self.parent = None if not FdtNode.__validate_dt_name(self.name): raise Exception("Invalid name '%s'" % self.name) def get_name(self): """Get property name""" return self.name def __check_name_duplicate(self, name): """Checks if name is not in a subnode""" for data in self.subdata: if not isinstance(data, FdtNop) \ and data.get_name() == name: return True return False def add_subnode(self, node): """Add child, deprecated use append()""" self.append(node) def add_raw_attribute(self, name, raw_value): """Construct a raw attribute and add to child""" self.append(FdtProperty.new_raw_property(name, raw_value)) def set_parent_node(self, node): """Set parent node, None and FdtNode accepted""" if node is not None and \ not isinstance(node, FdtNode): raise Exception("Invalid object type") self.parent = node def get_parent_node(self): """Get parent node""" return self.parent def __str__(self): """String representation""" return "Node(%s)" % self.name def dts_represent(self, depth=0): """Get dts string representation""" result = ('\n').join([sub.dts_represent(depth+1) for sub in self.subdata]) if len(result) > 0: result += '\n' return INDENT*depth + self.name + ' {\n' + \ result + INDENT*depth + "};" def dtb_represent(self, strings_store, pos=0, version=17): """Get blob representation Pass string storage as strings_store, pos for current node start and version as current dtb version """ # print "%x:%s" % (pos, self) strings = strings_store if self.get_name() == '/': blob = pack('>II', FDT_BEGIN_NODE, 0) else: blob = pack('>I', FDT_BEGIN_NODE) blob += self.get_name().encode('ascii') + pack('b', 0) if len(blob) % 4: blob += pack('b', 0) * (4-(len(blob) % 4)) pos += len(blob) for sub in self.subdata: (data, strings, pos) = sub.dtb_represent(strings, pos, version) blob += data pos += 4 blob += pack('>I', FDT_END_NODE) return (blob, strings, pos) def json_represent(self, depth=0): """Get dts string representation""" result = (',\n'+ \ INDENT*(depth+1)).join([sub.json_represent(depth+1) for sub in self.subdata if not isinstance(sub, FdtNop)]) if len(result) > 0: result = INDENT + result + '\n'+INDENT*depth if self.get_name() == '/': return "{\n" + INDENT*(depth) + result + "}" else: return json.dumps(self.name) + ': {\n' + \ INDENT*(depth) + result + "}" def __getitem__(self, index): """Get subnodes, returns either a Node, a Property or a Nop""" return self.subdata[index] def __setitem__(self, index, subnode): """Set node at index, replacing previous subnode, must not be a duplicate name """ if self.subdata[index].get_name() != subnode.get_name() and \ self.__check_name_duplicate(subnode.get_name()): raise Exception("%s : %s subnode already exists" % \ (self, subnode)) if not isinstance(subnode, (FdtNode, FdtProperty, FdtNop)): raise Exception("Invalid object type") self.subdata[index] = subnode def __len__(self): """Get strings count""" return len(self.subdata) def __ne__(self, node): """Check node inequality i.e. is subnodes are the same, in either order and properties are the same (same values) The FdtNop is excluded from the check """ return not self.__eq__(node) def __eq__(self, node): """Check node equality i.e. is subnodes are the same, in either order and properties are the same (same values) The FdtNop is excluded from the check """ if not isinstance(node, FdtNode): raise Exception("Invalid object type") if self.name != node.get_name(): return False curnames = set([subnode.get_name() for subnode in self.subdata if not isinstance(subnode, FdtNop)]) cmpnames = set([subnode.get_name() for subnode in node if not isinstance(subnode, FdtNop)]) if curnames != cmpnames: return False for subnode in [subnode for subnode in self.subdata if not isinstance(subnode, FdtNop)]: index = node.index(subnode.get_name()) if subnode != node[index]: return False return True def append(self, subnode): """Append subnode, same as add_subnode""" if self.__check_name_duplicate(subnode.get_name()): raise Exception("%s : %s subnode already exists" % \ (self, subnode)) if not isinstance(subnode, (FdtNode, FdtProperty, FdtNop)): raise Exception("Invalid object type") self.subdata.append(subnode) def pop(self, index=-1): """Remove and returns subnode at index, default the last""" return self.subdata.pop(index) def insert(self, index, subnode): """Insert subnode before index, must not be a duplicate name""" if self.__check_name_duplicate(subnode.get_name()): raise Exception("%s : %s subnode already exists" % \ (self, subnode)) if not isinstance(subnode, (FdtNode, FdtProperty, FdtNop)): raise Exception("Invalid object type") self.subdata.insert(index, subnode) def _find(self, name): """Find name in subnodes""" for i in range(0, len(self.subdata)): if not isinstance(self.subdata[i], FdtNop) and \ name == self.subdata[i].get_name(): return i return None def remove(self, name): """Remove subnode with the name Raises ValueError is not present """ index = self._find(name) if index is None: raise ValueError("Not present") return self.subdata.pop(index) def index(self, name): """Returns position of subnode with the name Raises ValueError is not present """ index = self._find(name) if index is None: raise ValueError("Not present") return index def merge(self, node): """Merge two nodes and subnodes Replace current properties with the given properties """ if not isinstance(node, FdtNode): raise Exception("Can only merge with a FdtNode") for subnode in [obj for obj in node if isinstance(obj, (FdtNode, FdtProperty))]: index = self._find(subnode.get_name()) if index is None: dup = deepcopy(subnode) if isinstance(subnode, FdtNode): dup.set_parent_node(self) self.append(dup) elif isinstance(subnode, FdtNode): self.subdata[index].merge(subnode) else: self.subdata[index] = copy(subnode) def walk(self): """Walk into subnodes and yield paths and objects Returns set with (path string, node object) """ node = self start = 0 hist = [] curpath = [] while True: for index in range(start, len(node)): if isinstance(node[index], (FdtNode, FdtProperty)): yield ('/' + '/'.join(curpath+[node[index].get_name()]), node[index]) if isinstance(node[index], FdtNode): if len(node[index]): hist.append((node, index+1)) curpath.append(node[index].get_name()) node = node[index] start = 0 index = -1 break if index >= 0: if len(hist): (node, start) = hist.pop() curpath.pop() else: break class Fdt(object): """Flattened Device Tree representation""" def __init__(self, version=17, last_comp_version=16, boot_cpuid_phys=0): """Init FDT object with version and boot values""" self.header = {'magic': FDT_MAGIC, 'totalsize': 0, 'off_dt_struct': 0, 'off_dt_strings': 0, 'off_mem_rsvmap': 0, 'version': version, 'last_comp_version': last_comp_version, 'boot_cpuid_phys': boot_cpuid_phys, 'size_dt_strings': 0, 'size_dt_struct': 0} self.rootnode = None self.prenops = None self.postnops = None self.reserve_entries = None def add_rootnode(self, rootnode, prenops=None, postnops=None): """Add root node""" self.rootnode = rootnode self.prenops = prenops self.postnops = postnops def get_rootnode(self): """Get root node""" return self.rootnode def add_reserve_entries(self, reserve_entries): """Add reserved entries as list of dict with 'address' and 'size' keys""" self.reserve_entries = reserve_entries def to_dts(self): """Export to DTS representation in string format""" result = "/dts-v1/;\n" result += "// version:\t\t%d\n" % self.header['version'] result += "// last_comp_version:\t%d\n" % \ self.header['last_comp_version'] if self.header['version'] >= 2: result += "// boot_cpuid_phys:\t0x%x\n" % \ self.header['boot_cpuid_phys'] result += '\n' if self.reserve_entries is not None: for entry in self.reserve_entries: result += "/memreserve/ " if entry['address']: result += "%#x " % entry['address'] else: result += "0 " if entry['size']: result += "%#x" % entry['size'] else: result += "0" result += ";\n" if self.prenops: result += '\n'.join([nop.dts_represent() for nop in self.prenops]) result += '\n' if self.rootnode is not None: result += self.rootnode.dts_represent() if self.postnops: result += '\n' result += '\n'.join([nop.dts_represent() for nop in self.postnops]) return result def to_dtb(self): """Export to Blob format""" if self.rootnode is None: return None blob_reserve_entries = pack('') if self.reserve_entries is not None: for entry in self.reserve_entries: blob_reserve_entries += pack('>QQ', entry['address'], entry['size']) blob_reserve_entries += pack('>QQ', 0, 0) header_size = 7 * 4 if self.header['version'] >= 2: header_size += 4 if self.header['version'] >= 3: header_size += 4 if self.header['version'] >= 17: header_size += 4 header_adjust = pack('') if header_size % 8 != 0: header_adjust = pack('b', 0) * (8 - (header_size % 8)) header_size += len(header_adjust) dt_start = header_size + len(blob_reserve_entries) # print "dt_start %d" % dt_start (blob_dt, blob_strings, dt_pos) = \ self.rootnode.dtb_represent('', dt_start, self.header['version']) if self.prenops is not None: blob_dt = pack('').join([nop.dtb_represent('')[0] for nop in self.prenops])\ + blob_dt if self.postnops is not None: blob_dt += pack('').join([nop.dtb_represent('')[0] for nop in self.postnops]) blob_dt += pack('>I', FDT_END) self.header['size_dt_strings'] = len(blob_strings) self.header['size_dt_struct'] = len(blob_dt) self.header['off_mem_rsvmap'] = header_size self.header['off_dt_struct'] = dt_start self.header['off_dt_strings'] = dt_start + len(blob_dt) self.header['totalsize'] = dt_start + len(blob_dt) + len(blob_strings) blob_header = pack('>IIIIIII', self.header['magic'], self.header['totalsize'], self.header['off_dt_struct'], self.header['off_dt_strings'], self.header['off_mem_rsvmap'], self.header['version'], self.header['last_comp_version']) if self.header['version'] >= 2: blob_header += pack('>I', self.header['boot_cpuid_phys']) if self.header['version'] >= 3: blob_header += pack('>I', self.header['size_dt_strings']) if self.header['version'] >= 17: blob_header += pack('>I', self.header['size_dt_struct']) return blob_header + header_adjust + blob_reserve_entries + \ blob_dt + blob_strings.encode('ascii') def to_json(self): """Ouput JSON""" if self.rootnode is None: return None return self.rootnode.json_represent() def resolve_path(self, path): """Resolve path like /memory/reg and return either a FdtNode, a FdtProperty or None""" if self.rootnode is None: return None if not path.startswith('/'): return None if len(path) > 1 and path.endswith('/'): path = path[:-1] if path == '/': return self.rootnode curnode = self.rootnode for subpath in path[1:].split('/'): found = None if not isinstance(curnode, FdtNode): return None for node in curnode: if subpath == node.get_name(): found = node break if found is None: return None curnode = found return curnode def _add_json_to_fdtnode(node, subjson): """Populate FdtNode with JSON dict items""" for (key, value) in subjson.items(): if isinstance(value, dict): subnode = FdtNode(key) subnode.set_parent_node(node) node.append(subnode) _add_json_to_fdtnode(subnode, value) elif isinstance(value, list): if len(value) < 2: raise Exception("Invalid list for %s" % key) if value[0] == "words": words = [int(word, 16) for word in value[1:]] node.append(FdtPropertyWords(key, words)) elif value[0] == "bytes": bytez = [int(byte, 16) for byte in value[1:]] node.append(FdtPropertyBytes(key, bytez)) elif value[0] == "strings": node.append(FdtPropertyStrings(key, \ [s for s in value[1:]])) else: raise Exception("Invalid list for %s" % key) elif value is None: node.append(FdtProperty(key)) else: raise Exception("Invalid value for %s" % key) def FdtJsonParse(buf): """Import FDT from JSON representation, see JSONDeviceTree.md for structure and encoding Returns an Fdt object """ tree = json.loads(buf) root = FdtNode('/') _add_json_to_fdtnode(root, tree) fdt = Fdt() fdt.add_rootnode(root) return fdt def FdtFsParse(path): """Parse device tree filesystem and return a Fdt instance Should be /proc/device-tree on a device, or the fusemount.py mount point. """ root = FdtNode("/") if path.endswith('/'): path = path[:-1] nodes = {path: root} for subpath, subdirs, files in os.walk(path): if subpath not in nodes.keys(): raise Exception("os.walk error") cur = nodes[subpath] for f in files: with open(subpath+'/'+f, 'rb') as content_file: content = content_file.read() prop = FdtProperty.new_raw_property(f, content) cur.add_subnode(prop) for subdir in subdirs: subnode = FdtNode(subdir) cur.add_subnode(subnode) subnode.set_parent_node(cur) nodes[subpath+'/'+subdir] = subnode fdt = Fdt() fdt.add_rootnode(root) return fdt class FdtBlobParse(object): # pylint: disable-msg=R0903 """Parse from file input""" __fdt_header_format = ">IIIIIII" __fdt_header_names = ('magic', 'totalsize', 'off_dt_struct', 'off_dt_strings', 'off_mem_rsvmap', 'version', 'last_comp_version') __fdt_reserve_entry_format = ">QQ" __fdt_reserve_entry_names = ('address', 'size') __fdt_dt_cell_format = ">I" __fdt_dt_prop_format = ">II" __fdt_dt_tag_name = {FDT_BEGIN_NODE: 'node_begin', FDT_END_NODE: 'node_end', FDT_PROP: 'prop', FDT_NOP: 'nop', FDT_END: 'end'} def __extract_fdt_header(self): """Extract DTB header""" header = Struct(self.__fdt_header_format) header_entry = Struct(">I") data = self.infile.read(header.size) result = dict(zip(self.__fdt_header_names, header.unpack_from(data))) if result['version'] >= 2: data = self.infile.read(header_entry.size) result['boot_cpuid_phys'] = header_entry.unpack_from(data)[0] if result['version'] >= 3: data = self.infile.read(header_entry.size) result['size_dt_strings'] = header_entry.unpack_from(data)[0] if result['version'] >= 17: data = self.infile.read(header_entry.size) result['size_dt_struct'] = header_entry.unpack_from(data)[0] return result def __extract_fdt_reserve_entries(self): """Extract reserved memory entries""" header = Struct(self.__fdt_reserve_entry_format) entries = [] self.infile.seek(self.fdt_header['off_mem_rsvmap']) while True: data = self.infile.read(header.size) result = dict(zip(self.__fdt_reserve_entry_names, header.unpack_from(data))) if result['address'] == 0 and result['size'] == 0: return entries entries.append(result) def __extract_fdt_nodename(self): """Extract node name""" data = '' pos = self.infile.tell() while True: byte = self.infile.read(1) if ord(byte) == 0: break data += byte.decode('ascii') align_pos = pos + len(data) + 1 align_pos = (((align_pos) + ((4) - 1)) & ~((4) - 1)) self.infile.seek(align_pos) return data def __extract_fdt_string(self, prop_string_pos): """Extract string from string pool""" data = '' pos = self.infile.tell() self.infile.seek(self.fdt_header['off_dt_strings']+prop_string_pos) while True: byte = self.infile.read(1) if ord(byte) == 0: break data += byte.decode('ascii') self.infile.seek(pos) return data def __extract_fdt_prop(self): """Extract property""" prop = Struct(self.__fdt_dt_prop_format) pos = self.infile.tell() data = self.infile.read(prop.size) (prop_size, prop_string_pos,) = prop.unpack_from(data) prop_start = pos + prop.size if self.fdt_header['version'] < 16 and prop_size >= 8: prop_start = (((prop_start) + ((8) - 1)) & ~((8) - 1)) self.infile.seek(prop_start) value = self.infile.read(prop_size) align_pos = self.infile.tell() align_pos = (((align_pos) + ((4) - 1)) & ~((4) - 1)) self.infile.seek(align_pos) return (self.__extract_fdt_string(prop_string_pos), value) def __extract_fdt_dt(self): """Extract tags""" cell = Struct(self.__fdt_dt_cell_format) tags = [] self.infile.seek(self.fdt_header['off_dt_struct']) while True: data = self.infile.read(cell.size) if len(data) < cell.size: break tag, = cell.unpack_from(data) # print "*** %s" % self.__fdt_dt_tag_name.get(tag, '') if self.__fdt_dt_tag_name.get(tag, '') in 'node_begin': name = self.__extract_fdt_nodename() if len(name) == 0: name = '/' tags.append((tag, name)) elif self.__fdt_dt_tag_name.get(tag, '') in ('node_end', 'nop'): tags.append((tag, '')) elif self.__fdt_dt_tag_name.get(tag, '') in 'end': tags.append((tag, '')) break elif self.__fdt_dt_tag_name.get(tag, '') in 'prop': propdata = self.__extract_fdt_prop() tags.append((tag, propdata)) else: print("Unknown Tag %d" % tag) return tags def __init__(self, infile): """Init with file input""" self.infile = infile self.fdt_header = self.__extract_fdt_header() if self.fdt_header['magic'] != FDT_MAGIC: raise Exception('Invalid Magic') if self.fdt_header['version'] > FDT_MAX_VERSION: raise Exception('Invalid Version %d' % self.fdt_header['version']) if self.fdt_header['last_comp_version'] > FDT_MAX_VERSION-1: raise Exception('Invalid last compatible Version %d' % self.fdt_header['last_comp_version']) self.fdt_reserve_entries = self.__extract_fdt_reserve_entries() self.fdt_dt_tags = self.__extract_fdt_dt() def __to_nodes(self): """Represent fdt as Node and properties structure Returns a set with the pre-node Nops, the Root Node, and the post-node Nops. """ prenops = [] postnops = [] rootnode = None curnode = None for tag in self.fdt_dt_tags: if self.__fdt_dt_tag_name.get(tag[0], '') in 'node_begin': newnode = FdtNode(tag[1]) if rootnode is None: rootnode = newnode if curnode is not None: curnode.add_subnode(newnode) newnode.set_parent_node(curnode) curnode = newnode elif self.__fdt_dt_tag_name.get(tag[0], '') in 'node_end': if curnode is not None: curnode = curnode.get_parent_node() elif self.__fdt_dt_tag_name.get(tag[0], '') in 'nop': if curnode is not None: curnode.add_subnode(FdtNop()) elif rootnode is not None: postnops.append(FdtNop()) else: prenops.append(FdtNop()) elif self.__fdt_dt_tag_name.get(tag[0], '') in 'prop': if curnode is not None: curnode.add_raw_attribute(tag[1][0], tag[1][1]) elif self.__fdt_dt_tag_name.get(tag[0], '') in 'end': continue return (prenops, rootnode, postnops) def to_fdt(self): """Create a fdt object Returns a Fdt object """ if self.fdt_header['version'] >= 2: boot_cpuid_phys = self.fdt_header['boot_cpuid_phys'] else: boot_cpuid_phys = 0 fdt = Fdt(version=self.fdt_header['version'], last_comp_version=self.fdt_header['last_comp_version'], boot_cpuid_phys=boot_cpuid_phys) (prenops, rootnode, postnops) = self.__to_nodes() fdt.add_rootnode(rootnode, prenops=prenops, postnops=postnops) fdt.add_reserve_entries(self.fdt_reserve_entries) return fdt