Hi, For sure I'll try it on Monday. In the meantime I've updated my script with some initial support for multiple pools based on previous one and your ideas. I've tested it on some clusters and it allows to get some pretty good results (disk space within 3% range). But I noticed that on clusters where racks weight differ a lot, there is uneven distribution of primary OSDs. Did you noticed similar behavior (you can check it in osdmaptool output). For such cluster I also had to increase choose_total_tries tunable to allow cluster to finish rebalancing (as in your example, PGs were stuck in active+remapped state). This is mostly for the case when we grow cluster for example of three racks adding some new OSDs in fourth one (or n-th one) when we don't want to/can't fill full rack at once (having much smaller weight the three others). Btw. we use failure domain of rack with replica size 3. Have a nice weekend, On 13.01.2017 19:36, David Turner wrote:
-- PS |
#!/usr/bin/python # PYTHON_ARGCOMPLETE_OK ## import os import re import sys import time import json import shutil import pprint import signal import argparse import subprocess import argcomplete ## finished = False class NotEnoughPGMemberError(BaseException): pass ## def log(text): print text ## def parse_osd_df(osd_df_path): osd_df = {} with open(osd_df_path, 'r') as fh: df = json.loads(fh.read()) #osd_df = {node[u'id']: node for node in df[u'nodes']} osd_df = df[u'nodes'] return osd_df ## def parse_ceph_df(ceph_df_path, pools=[]): ceph_df = {} with open(ceph_df_path, 'r') as fh: df = json.loads(fh.read()) total_used = 0 for pool in df[u'pools']: if len(pools) == 0 or pool[u'name'] in pools: total_used += pool[u'stats'][u'bytes_used'] for pool in df[u'pools']: if len(pools) == 0 or pool[u'name'] in pools: ceph_df[pool[u'name']] = { u'id': pool[u'id'], u'weight': round(pool[u'stats'][u'bytes_used'] / float(total_used), 2), } return ceph_df ## def set_initial_crush_weights(crushmap, osd_df, choose_total_tries=None): if choose_total_tries is not None: cmd = 'crushtool -i {} -o {} --set-choose-total-tries {}'.format( crushmap, crushmap, choose_total_tries) ## log('setting "choose_total_tries" tunable to {}'.format(choose_total_tries)) subprocess.check_output(cmd, shell=True, preexec_fn=os.setpgrp) for osd in osd_df: new_osd_weight = round(osd[u'kb'] / float(1024*1024*1024), 5) cmd = 'crushtool -i {} -o {} --reweight-item osd.{} {}'.format( crushmap, crushmap, osd[u'id'], new_osd_weight) subprocess.check_output(cmd, shell=True, preexec_fn=os.setpgrp) ## def prepare_final_crush_map(crushmap, weights): final_crushmap = 'offline_crush_{}'.format(time.time()) shutil.copy(crushmap, final_crushmap) for osd in osd_df: new_osd_weight = round(weights[str(osd[u'id'])][u'crush_weight'], 5) cmd = 'crushtool -i {} -o {} --reweight-item osd.{} {}'.format( final_crushmap, final_crushmap, osd[u'id'], new_osd_weight) crushtool = subprocess.check_output(cmd, shell=True, preexec_fn=os.setpgrp) return final_crushmap ## def map_pgs(osdmap, crushmap, pool_id=None): ## weights = {} pg_stats = { 'min_pg': 100000000, 'max_pg': 0, 'mapped_min': 99, 'mapped_max': 0, } ## cmd = 'osdmaptool {} --import-crush {} --test-map-pgs-dump --mark-up-in --clear-temp 2>/dev/null'.format( osdmap, crushmap) if pool_id is not None: cmd += ' --pool {}'.format(pool_id) osdmaptool = subprocess.check_output(cmd, shell=True, preexec_fn=os.setpgrp) ## for line in osdmaptool.split('\n'): m = re.match(r'^osd\.(?P<id>\d+)\s+(?P<pg_count>\d+)\s+(?P<first>\d+)\s+(?P<primary>\d+)\s+(?P<crush_weight>\d+(\.\d+)?)\s+(?P<weight>\d+(\.\d+)?)$', line) k = re.match(r'^\s+avg\s+(?P<avg>\d+)\s+stddev\s+(?P<stddev>\d+(\.\d+)?)\s+', line) p = re.match(r'^\s*(?P<pg>[0-9a-fA-F]+\.[0-9a-fA-F]+)\s+\[(?P<upset>[\d,]+)\]\s+(?P<primary>\d+)\s*$', line) if m: weights[m.group('id')] = { 'pg_count': float(m.group('pg_count')), 'crush_weight': float(m.group('crush_weight')), } if float(m.group('pg_count')) > pg_stats['max_pg']: pg_stats['max_pg'] = float(m.group('pg_count')) if float(m.group('pg_count')) < pg_stats['min_pg']: pg_stats['min_pg'] = float(m.group('pg_count')) elif k: pg_stats['avg'] = float(k.group('avg')) pg_stats['stddev'] = float(k.group('stddev')) elif p: size = len(p.group('upset').split(',')) if pg_stats['mapped_min'] > size: pg_stats['mapped_min'] = size if pg_stats['mapped_max'] < size: pg_stats['mapped_max'] = size ## if pg_stats['mapped_min'] != pg_stats['mapped_max']: raise NotEnoughPGMemberError('unable to find enough OSD for some ' 'PG, pool_id == {}, mapped(min, max) == ({}, {})'.format( pool_id, pg_stats['mapped_min'], pg_stats['mapped_max'])) return (weights, pg_stats) ## def update_crush_weights(crushmap, old_weights, avg_pg_count, change_step): stats = { 'up': 0, 'down': 0, } for osd_id in old_weights: osd = old_weights[osd_id] if osd['pg_count'] < avg_pg_count: new_osd_weight = osd['crush_weight'] * (1.0 + change_step) cmd = 'crushtool -i {} -o {} --reweight-item osd.{} {}'.format( crushmap, crushmap, osd_id, new_osd_weight) crushtool = subprocess.check_output(cmd, shell=True, preexec_fn=os.setpgrp) stats['up'] += 1 elif osd['pg_count'] > avg_pg_count: new_osd_weight = osd['crush_weight'] * (1.0 - change_step) cmd = 'crushtool -i {} -o {} --reweight-item osd.{} {}'.format( crushmap, crushmap, osd_id, new_osd_weight) crushtool = subprocess.check_output(cmd, shell=True, preexec_fn=os.setpgrp) stats['down'] += 1 return stats ## def find_optimal_osd_crush_weights_for_pool(ceph_df, osd_df, osdmap, crushmap, pool_id, target_stddev, max_rounds, pg_min_max_diff, change_step=0.005): ## global finished last_stddev = 999999 round_no = 0 ## prepare crushmap copy to operate on tmp_crushmap = 'offline_crush_{}.tmp'.format(time.time()) shutil.copy(crushmap, tmp_crushmap) ## while not finished and round_no < args.max_rounds: round_no += 1 if round_no == 1: (weights, pg_stats) = map_pgs(osdmap, tmp_crushmap, pool_id=pool_id) ## if pg_stats['stddev'] <= target_stddev or (pg_stats['max_pg'] - pg_stats['min_pg']) <= args.pg_min_max_diff: break ## change weight and update stats update_stats = update_crush_weights(tmp_crushmap, weights, pg_stats['avg'], change_step) (weights, pg_stats) = map_pgs(osdmap, tmp_crushmap, pool_id=pool_id) ## print progress info sys.stdout.write('\rpool: {:3.0f}, round: {:5.0f}, stddev: {:8.4f}, ' 'up: {:4.0f}, down: {:4.0f}, min_pg: {:4.0f}, max_pg: {:4.0f}'. format(pool_id, round_no, pg_stats['stddev'], update_stats['up'], update_stats['down'], pg_stats['min_pg'], pg_stats['max_pg'])) sys.stdout.flush() ## clear progress line sys.stdout.write('\r{}\r'.format(' ' * 100)) ## prepare final stats (weights, pg_stats) = map_pgs(osdmap, tmp_crushmap, pool_id=pool_id) os.unlink(tmp_crushmap) return (weights, pg_stats) ## def find_optimal_osd_crush_weights(ceph_df, osd_df, osdmap, crushmap, pools, target_stddev, max_rounds, pg_min_max_diff, change_step=0.005): ## weights = {} pg_stats = {} for pool in pools: (weights[pool], pg_stats[pool]) = find_optimal_osd_crush_weights_for_pool( ceph_df=ceph_df, osd_df=osd_df, osdmap=osdmap, crushmap=crushmap, pool_id=ceph_df[pool][u'id'], target_stddev=args.target_stddev, max_rounds=args.max_rounds, pg_min_max_diff=args.pg_min_max_diff) weights[u'FINAL'] = {} for pool in pools: for osd_id in weights[pool]: weights[u'FINAL'].setdefault(osd_id, { u'crush_weight': 0, u'pg_count': 0 }) weights[u'FINAL'][osd_id][u'crush_weight'] += \ weights[pool][osd_id][u'crush_weight'] * ceph_df[pool][u'weight'] weights[u'FINAL'][osd_id][u'pg_count'] += \ weights[pool][osd_id][u'pg_count'] return (weights, pg_stats) ## def exit_handler(signum, frame): global finished finished = True ## if __name__ == '__main__': ## signal.signal(signal.SIGINT, exit_handler) ## parser = argparse.ArgumentParser() parser.add_argument('osd_df', help='path to osd df (json)', default=None, type=str) parser.add_argument('ceph_df', help='path to ceph df (json)', default=None, type=str) parser.add_argument('osdmap', help='path to osdmap (binary)', default=None, type=str) parser.add_argument('crushmap', help='path to crushmap (binary)', default=None, type=str) parser.add_argument('pools', help='pools used to calculate distribution', default=None, type=str, nargs='+') parser.add_argument('--target-stddev', help='target stddev', default=1.0, type=float) parser.add_argument('--initial-change-step', help='change step in %%', default=500, type=int) parser.add_argument('--max-rounds', help='max number of round', default=1000, type=int) parser.add_argument('--pg-min-max-diff', help='max acceptable difference beetween min_pg and max_pg', default=0, type=int) parser.add_argument('--choose-total-tries', default=None, type=int, help='set choose_total_tries tunable, default do not change') argcomplete.autocomplete(parser) args = parser.parse_args() ## osdmap = args.osdmap crushmap = args.crushmap pools = args.pools osd_df = parse_osd_df(args.osd_df) ceph_df = parse_ceph_df(args.ceph_df) set_initial_crush_weights(crushmap, osd_df, choose_total_tries=args.choose_total_tries) ## start calculations (weights, pg_stats) = find_optimal_osd_crush_weights(ceph_df=ceph_df, osd_df=osd_df, osdmap=osdmap, crushmap=crushmap, pools=pools, target_stddev=args.target_stddev, max_rounds=args.max_rounds, pg_min_max_diff=args.pg_min_max_diff) ## prepare final crushmap final_crushmap = prepare_final_crush_map(crushmap=crushmap, weights=weights['FINAL']) print '\nto apply run:\n\tceph osd setcrushmap -i {}\n'.format(final_crushmap)
_______________________________________________ Ceph-large mailing list Ceph-large@xxxxxxxxxxxxxx http://lists.ceph.com/listinfo.cgi/ceph-large-ceph.com