+# - checks that it's a Dict (imported from a JSON object)
+# - checks that all 'mandatory' fields are present and their types match
+# - checks the types of all 'optional' fields
+# - checks that no unknown fields are present
+def check_keys(entry, mandatory, optional):
+ keys = set(entry.keys())
+
+ for k, t in mandatory:
+ try:
+ keys.remove(k)
+ except KeyError:
+ raise Exception("missing mandatory key '%s' in schema '%s'" % (k, entry))
+
+ if not isinstance(entry[k], t):
+ raise Exception("key '%s' is not of the expected type '%s' in schema '%s'" % (k, t, entry))
+
+ for k, t in optional:
+ if k in keys:
+ keys.discard(k)
+
+ if t is not None:
+ if not isinstance(entry[k], t):
+ raise Exception("key '%s' is not of the expected type '%s' in schema '%s'" % (k, t, entry))
+
+ if len(keys) > 0:
+ raise Exception("unhandled keys '%s' in schema '%s'" % (','.join(list(keys)), entry))
+
+
+# Validates the optional 'features' and that they consist only of strings
+def check_features_list(entry):
+ for f in entry.get('features', []):
+ if not isinstance(f, str):
+ raise Exception("broken 'features' list in schema entry '%s'" % entry)
+
+
+# Validate that the passed schema has only supported members. This is useful to
+# stay up to date with any changes to the schema.
+def validate_schema(schemalist):
+ for entry in schemalist:
+ if not isinstance(entry, dict):
+ raise Exception("schema entry '%s' is not a JSON Object (dict)" % (entry))
+
+ match entry.get('meta-type', None):
+ case 'command':
+ check_keys(entry,
+ mandatory=[('name', str),
+ ('meta-type', str),
+ ('arg-type', str),
+ ('ret-type', str)],
+ optional=[('features', list),
+ ('allow-oob', bool)])
+
+ check_features_list(entry)
+
+ case 'event':
+ check_keys(entry,
+ mandatory=[('name', str),
+ ('meta-type', str),
+ ('arg-type', str)],
+ optional=[('features', list)])
+
+ check_features_list(entry)
+
+ case 'object':
+ check_keys(entry,
+ mandatory=[('name', str),
+ ('meta-type', str),
+ ('members', list)],
+ optional=[('tag', str),
+ ('variants', list),
+ ('features', list)])
+
+ check_features_list(entry)
+
+ for m in entry.get('members', []):
+ check_keys(m,
+ mandatory=[('name', str),
+ ('type', str)],
+ optional=[('default', None),
+ ('features', list)])
+ check_features_list(m)
+
+ for m in entry.get('variants', []):
+ check_keys(m,
+ mandatory=[('case', str),
+ ('type', str)],
+ optional=[])
+
+ case 'array':
+ check_keys(entry,
+ mandatory=[('name', str),
+ ('meta-type', str),
+ ('element-type', str)],
+ optional=[])
+
+ case 'enum':
+ check_keys(entry,
+ mandatory=[('name', str),
+ ('meta-type', str)],
+ optional=[('members', list),
+ ('values', list)])
+
+ for m in entry.get('members', []):
+ check_keys(m,
+ mandatory=[('name', str)],
+ optional=[('features', list)])
+ check_features_list(m)
+
+ case 'alternate':
+ check_keys(entry,
+ mandatory=[('name', str),
+ ('meta-type', str),
+ ('members', list)],
+ optional=[])
+
+ for m in entry.get('members', []):
+ check_keys(m,
+ mandatory=[('type', str)],
+ optional=[])
+ case 'builtin':
+ check_keys(entry,
+ mandatory=[('name', str),
+ ('meta-type', str),
+ ('json-type', str)],
+ optional=[])
+
+ case _:
+ raise Exception("unknown or missing 'meta-type' in schema entry '%s'" % entry)
+
+
+# Convert a list of QMP schema entries into a dict organized via 'name' member
+def load_schema_json_dict(schemalist):
+ schemadict = {}
+
+ for memb in schemalist:
+ schemadict[memb['name']] = memb
+
+ return schemadict
+
+
+# loads and validates the QMP schema from the .replies file 'filename'
+def load_schema(filename):
+ schemalist = load_schema_json_list(filename)
+
+ if not schemalist:
+ raise Exception("QMP schema not found in '%s'" % (filename))
+
+ validate_schema(schemalist)
+
+ return load_schema_json_dict(schemalist)
+
+
+# Recursively traverse the schema and print out the schema query strings for
+# the corresponding entries. In certain cases the schema references itself,
+# which is handled by passing a 'trace' list which contains the current path
+def iterate_schema(name, cur, trace, schema):
+ obj = schema[name]
+
+ if name in trace:
+ print('%s (recursion)' % cur)
+ return
+
+ trace = trace + [name]
+
+ match obj['meta-type']:
+ case 'command' | 'event':
+ arguments = obj.get('arg-type', None)
+ returns = obj.get('ret-type', None)
+
+ print(name)
+
+ for f in obj.get('features', []):
+ print('%s/$%s' % (cur, f))
+
+ if arguments:
+ iterate_schema(arguments, cur + '/arg-type', trace, schema)
+
+ if returns:
+ iterate_schema(returns, cur + '/ret-type', trace, schema)
+
+ case 'object':
+ members = sorted(obj.get('members', []), key=lambda d: d['name'])
+ variants = sorted(obj.get('variants', []), key=lambda d: d['case'])
+
+ for f in obj.get('features', []):
+ print('%s/$%s' % (cur, f))
+
+ for memb in members:
+ membpath = "%s/%s" % (cur, memb['name'])
+ print(membpath)
+
+ for f in memb.get('features', []):
+ print('%s/$%s' % (membpath, f))
+
+ iterate_schema(memb['type'], membpath, trace, schema)
+
+ for var in variants:
+ varpath = "%s/+%s" % (cur, var['case'])
+ print(varpath)
+ iterate_schema(var['type'], varpath, trace, schema)
+
+ case 'enum':
+ members = sorted(obj.get('members', []), key=lambda d: d['name'])
+
+ for m in members:
+ print('%s/^%s' % (cur, m['name']))
+
+ for f in m.get('features', []):
+ print('%s/^%s/$%s' % (cur, m['name'], f))
+
+ case 'array':
+ iterate_schema(obj['element-type'], cur, trace, schema)
+
+ case 'builtin':
+ print('%s/!%s' % (cur, name))
+
+ case 'alternate':
+ for var in obj['members']:
+ iterate_schema(var['type'], cur, trace, schema)
+
+ case _:
+ raise Exception("unhandled 'meta-type' '%s'" % obj.get('meta-type', '<missing>'))
+
+
+def process_one_schema(schemafile, validate_only):
+ try:
+ schema = load_schema(schemafile)
+ except Exception as e:
+ raise Exception("Failed to load schema '%s': %s" % (schemafile, e))
+
+ if validate_only:
+ return
+
+ toplevel = []
+
+ for k, v in schema.items():
+ if v['meta-type'] == 'command' or v['meta-type'] == 'event':
+ toplevel.append(k)
+
+ toplevel.sort()
+
+ for c in toplevel:
+ iterate_schema(c, c, [], schema)
+
+
+parser = argparse.ArgumentParser(description='A tool to generate QMP schema query strins and validator of schema coverage')