As there are more reports of timeouts connected to the mirrorlist servers I had a closer look at the code. It seems we are hitting the default value of max_children = 40. This freeze break request tries to increase this to 80 via a hotfix. See commit message below for further details. Adrian commit 2b8e284e687d097c2cad6cd24dc3db3aa23f837d Author: Adrian Reber <adrian@xxxxxxxx> Date: Thu Oct 15 14:03:02 2015 +0000 Increase the number of possible child processes The mirrorlist-server is the process which has the mirrorlist data loaded and which is accessed by the public facing mirrorlist_client.wsgi. The mirrorlist-server uses the ForkingUnixStreamServer which has a default of max_children = 40. (https://hg.python.org/cpython/file/2.7/Lib/SocketServer.py#l516) Looking at the code of ForkingUnixStreamServer it says at https://hg.python.org/cpython/file/2.7/Lib/SocketServer.py#l523 # If we're above the max number of children, wait and reap them until # we go back below threshold. Note that we use waitpid(-1) below to be # able to collect children in size(<defunct children>) syscalls instead # of size(<children>): the downside is that this might reap children # which we didn't spawn, which is why we only resort to this when we're # above max_children. As we are running the wsgi with processes=45 this sounds like it can lead to situation where it might just hang. This increases max_children to 80 and maybe processes could be upped to 60 or 70. Signed-off-by: Adrian Reber <adrian@xxxxxxxx> diff --git a/files/hotfix/mirrorlist/mirrorlist_server.py b/files/hotfix/mirrorlist/mirrorlist_server.py index 2d98fa0..2b81912 100644 --- a/files/hotfix/mirrorlist/mirrorlist_server.py +++ b/files/hotfix/mirrorlist/mirrorlist_server.py @@ -938,6 +938,7 @@ def sigterm_handler(signum, frame): class ForkingUnixStreamServer(ForkingMixIn, UnixStreamServer): request_queue_size = 300 + max_children = 80 def finish_request(self, request, client_address): signal.signal(signal.SIGHUP, signal.SIG_IGN) BaseServer.finish_request(self, request, client_address) diff --git a/roles/mirrormanager/mirrorlist2/tasks/main.yml b/roles/mirrormanager/mirrorlist2/tasks/main.yml index 786432f..2e5f5ee 100644 --- a/roles/mirrormanager/mirrorlist2/tasks/main.yml +++ b/roles/mirrormanager/mirrorlist2/tasks/main.yml @@ -73,3 +73,12 @@ # tags: # - mirrorlist2 # - selinux + + +- name: HOTFIX - increase the number of possible child processes + copy: src="{{ files }}/hotfix/mirrorlist/mirrorlist_server.py" dest=/usr/share/mirrormanager2/mirrorlist_server.py + owner=root group=root mode=0755 + tags: + - hotfix + notify: + - restart mirrorlist-server commit d04588274a6e161a36d8dbd9c91ea0fe78de017e Author: Adrian Reber <adrian@xxxxxxxx> Date: Thu Oct 15 13:54:53 2015 +0000 Original mirrorlist_server.py which needs to be hotfixed Signed-off-by: Adrian Reber <adrian@xxxxxxxx> diff --git a/files/hotfix/mirrorlist/mirrorlist_server.py b/files/hotfix/mirrorlist/mirrorlist_server.py new file mode 100644 index 0000000..2d98fa0 --- /dev/null +++ b/files/hotfix/mirrorlist/mirrorlist_server.py @@ -0,0 +1,1136 @@ +#!/usr/bin/python +# +# Copyright (c) 2007-2013 Dell, Inc. +# by Matt Domsch <Matt_Domsch@xxxxxxxx> +# Licensed under the MIT/X11 license + +# standard library modules in alphabetical order +from collections import defaultdict +import datetime +import getopt +import logging +import logging.handlers +import os +import random +import cPickle as pickle +import select +import signal +import socket +from SocketServer import (StreamRequestHandler, ForkingMixIn, + UnixStreamServer, BaseServer) +import sys +from string import zfill, atoi +import time +import traceback + +try: + import threading +except ImportError: + import dummy_threading as threading + +# not-so-standard library modules that this program needs +from IPy import IP +import GeoIP +import radix +from weighted_shuffle import weighted_shuffle + +# can be overridden on the command line +pidfile = '/var/run/mirrormanager/mirrorlist_server.pid' +socketfile = '/var/run/mirrormanager/mirrorlist_server.sock' +cachefile = '/var/lib/mirrormanager/mirrorlist_cache.pkl' +internet2_netblocks_file = '/var/lib/mirrormanager/i2_netblocks.txt' +global_netblocks_file = '/var/lib/mirrormanager/global_netblocks.txt' +logfile = None +debug = False +must_die = False +# at a point in time when we're no longer serving content for versions +# that don't use yum prioritymethod=fallback +# (e.g. after Fedora 7 is past end-of-life) +# then we can set this value to True +# this only affects results requested using path=... +# for dirs which aren't repositories (such as iso/) +# because we don't know the Version associated with that dir here. +default_ordered_mirrorlist = False + +gipv4 = None +gipv6 = None + +# key is strings in tuple (repo.prefix, arch) +mirrorlist_cache = {} + +# key is directory.name, returns keys for mirrorlist_cache +directory_name_to_mirrorlist = {} + +# key is an IPy.IP structure, value is list of host ids +host_netblock_cache = {} + +# key is hostid, value is list of countries to allow +host_country_allowed_cache = {} + +repo_arch_to_directoryname = {} + +# redirect from a repo with one name to a repo with another +repo_redirect = {} +country_continent_redirect_cache = {} + +# our own private copy of country_continents to be edited +country_continents = GeoIP.country_continents + +disabled_repositories = {} +host_bandwidth_cache = {} +host_country_cache = {} +host_max_connections_cache = {} +file_details_cache = {} +hcurl_cache = {} +asn_host_cache = {} +internet2_tree = radix.Radix() +global_tree = radix.Radix() +host_netblocks_tree = radix.Radix() +netblock_country_tree = radix.Radix() +location_cache = {} +netblock_country_cache = {} + +## Set up our syslog data. +syslogger = logging.getLogger('mirrormanager') +syslogger.setLevel(logging.INFO) +handler = logging.handlers.SysLogHandler( + address='/dev/log', + facility=logging.handlers.SysLogHandler.LOG_LOCAL4) +syslogger.addHandler(handler) + + +def lookup_ip_asn(tree, ip): + """ @t is a radix tree + @ip is an IPy.IP object which may be contained in an entry in l + """ + node = tree.search_best(ip.strNormal()) + if node is None: + return None + return node.data['asn'] + + +def uniqueify(seq, idfun=None): + # order preserving + if idfun is None: + def idfun(x): return x + seen = {} + result = [] + for item in seq: + marker = idfun(item) + # in old Python versions: + # if seen.has_key(marker) + # but in new ones: + if marker in seen: continue + seen[marker] = 1 + result.append(item) + return result + + +##### Metalink Support ##### + +def metalink_header(): + # fixme add alternate format pubdate when specified + pubdate = datetime.datetime.utcnow().strftime("%a, %d %b %Y %H:%M:%S GMT") + doc = '' + doc += '<?xml version="1.0" encoding="utf-8"?>\n' + doc += '<metalink version="3.0" xmlns="http://www.metalinker.org/"' + doc += ' type="dynamic"' + doc += ' pubdate="%s"' % pubdate + doc += ' generator="mirrormanager"' + doc += ' xmlns:mm0="http://fedorahosted.org/mirrormanager"' + doc += '>\n' + return doc + + +def metalink_failuredoc(message=None): + doc = metalink_header() + if message is not None: + doc += '<!--\n' + doc += message + '\n' + doc += '-->\n' + doc += '</metalink>\n' + return doc + + +def metalink_file_not_found(directory, file): + message = '%s/%s not found or has no metalink' % (directory, file) + return metalink_failuredoc(message) + + +def metalink(cache, directory, file, hosts_and_urls): + preference = 100 + try: + fdc = file_details_cache[directory] + detailslist = fdc[file] + except KeyError: + return ('metalink', 404, metalink_file_not_found(directory, file)) + + def indent(n): + return ' ' * n * 2 + + doc = metalink_header() + doc += indent(1) + '<files>\n' + doc += indent(2) + '<file name="%s">\n' % (file) + y = detailslist[0] + + def details(y, indentlevel=2): + doc = '' + if y['timestamp'] is not None: + doc += indent(indentlevel+1) \ + + '<mm0:timestamp>%s</mm0:timestamp>\n' % y['timestamp'] + if y['size'] is not None: + doc += indent(indentlevel+1) + '<size>%s</size>\n' % y['size'] + doc += indent(indentlevel+1) + '<verification>\n' + hashes = ('md5', 'sha1', 'sha256', 'sha512') + for h in hashes: + if y[h] is not None: + doc += indent(indentlevel+2) \ + + '<hash type="%s">%s</hash>\n' % (h, y[h]) + doc += indent(indentlevel+1) + '</verification>\n' + return doc + + doc += details(y, 2) + # there can be multiple files + if len(detailslist) > 1: + doc += indent(3) + '<mm0:alternates>\n' + for y in detailslist[1:]: + doc += indent(4) + '<mm0:alternate>\n' + doc += details(y,5) + doc += indent(4) + '</mm0:alternate>\n' + doc += indent(3) + '</mm0:alternates>\n' + + doc += indent(3) + '<resources maxconnections="1">\n' + for (hostid, hcurls) in hosts_and_urls: + private = '' + if hostid not in cache['global']: + private = 'mm0:private="True"' + for url in hcurls: + protocol = url.split(':')[0] + # FIXME January 2010 + # adding protocol= here is not part of the Metalink 3.0 spec, + # but MirrorManager 1.2.6 used it accidentally, as did + # yum 3.2.20-3 as released in Fedora 8, 9, and 10. After those + # three are EOL (~January 2010), the extra protocol= can be + # removed. + doc += indent(4) + \ + '<url protocol="%s" type="%s" location="%s" '\ + 'preference="%s" %s>' % ( + protocol, protocol, host_country_cache[hostid].upper(), + preference, private) + doc += url + doc += '</url>\n' + preference = max(preference-1, 1) + doc += indent(3) + '</resources>\n' + doc += indent(2) + '</file>\n' + doc += indent(1) + '</files>\n' + doc += '</metalink>\n' + return ('metalink', 200, doc) + + +def tree_lookup(tree, ip, field, maxResults=None): + # fast lookup in the tree; if present, find all the matching values by + # deleting the found one and searching again this is safe w/o copying + # the tree again only because this is the only place the tree is used, + # and we'll get a new copy of the tree from our parent the next time it + # fork()s. + # returns a list of tuples (prefix, data) + result = [] + len_data = 0 + if ip is None: + return result + node = tree.search_best(ip.strNormal()) + while node is not None: + prefix = node.prefix + if type(node.data[field]) == list: + len_data += len(node.data[field]) + else: + len_data += 1 + t = (prefix, node.data[field],) + result.append(t) + if maxResults is None or len_data < maxResults: + tree.delete(prefix) + node = tree.search_best(ip.strNormal()) + else: + break + return result + + +def trim_by_client_country(s, clientCountry): + if clientCountry is None: + return s + r = s.copy() + for hostid in s: + if hostid in host_country_allowed_cache and \ + clientCountry not in host_country_allowed_cache[hostid]: + r.remove(hostid) + return r + + +def shuffle(s): + l = [] + for hostid in s: + item = (host_bandwidth_cache[hostid], hostid) + l.append(item) + newlist = weighted_shuffle(l) + results = [] + for (bandwidth, hostid) in newlist: + results.append(hostid) + return results + + +continents = {} + +def handle_country_continent_redirect(): + new_country_continents = GeoIP.country_continents + for country, continent in country_continent_redirect_cache.iteritems(): + new_country_continents[country] = continent + global country_continents + country_continents = new_country_continents + + +def setup_continents(): + new_continents = defaultdict(list) + handle_country_continent_redirect() + for c, continent in country_continents.iteritems(): + new_continents[continent].append(c) + global continents + continents = new_continents + + +def do_global(kwargs, cache, clientCountry, header): + c = trim_by_client_country(cache['global'], clientCountry) + header += 'country = global ' + return (header, c) + + +def do_countrylist(kwargs, cache, clientCountry, requested_countries, header): + + def collapse(d): + """ collapses a dict {key:set(hostids)} into a set of hostids """ + s = set() + for country, hostids in d.iteritems(): + for hostid in hostids: + s.add(hostid) + return s + + country_cache = {} + for c in requested_countries: + if c in cache['byCountry']: + country_cache[c] = cache['byCountry'][c] + header += 'country = %s ' % c + s = collapse(country_cache) + s = trim_by_client_country(s, clientCountry) + return (header, s) + + +def get_same_continent_countries(clientCountry, requested_countries): + result = [] + for r in requested_countries: + if r in country_continents: + requestedCountries = [ + c.upper() for c in continents[country_continents[r]] + if c != clientCountry] + result.extend(requestedCountries) + result = uniqueify(result) + return result + + +def do_continent(kwargs, cache, clientCountry, requested_countries, header): + if len(requested_countries) > 0: + rc = requested_countries + else: + rc = [clientCountry] + clist = get_same_continent_countries(clientCountry, rc) + return do_countrylist(kwargs, cache, clientCountry, clist, header) + + +def do_country(kwargs, cache, clientCountry, requested_countries, header): + if 'GLOBAL' in requested_countries: + return do_global(kwargs, cache, clientCountry, header) + return do_countrylist( + kwargs, cache, clientCountry, requested_countries, header) + + +def do_netblocks(kwargs, cache, header): + hostresults = set() + if not kwargs.has_key('netblock') or kwargs['netblock'] == "1": + tree_results = tree_lookup(host_netblocks_tree, kwargs['IP'], 'hosts') + for (prefix, hostids) in tree_results: + for hostid in hostids: + if hostid in cache['byHostId']: + hostresults.add((prefix, hostid,)) + header += 'Using preferred netblock ' + return (header, hostresults) + + +def do_internet2(kwargs, cache, clientCountry, header): + hostresults = set() + ip = kwargs['IP'] + if ip is None: + return (header, hostresults) + asn = lookup_ip_asn(internet2_tree, ip) + if asn is not None: + header += 'Using Internet2 ' + if clientCountry is not None \ + and clientCountry in cache['byCountryInternet2']: + hostresults = cache['byCountryInternet2'][clientCountry] + hostresults = trim_by_client_country(hostresults, clientCountry) + return (header, hostresults) + + +def do_asn(kwargs, cache, header): + hostresults = set() + ip = kwargs['IP'] + if ip is None: + return (header, hostresults) + asn = lookup_ip_asn(global_tree, ip) + if asn is not None and asn in asn_host_cache: + for hostid in asn_host_cache[asn]: + if hostid in cache['byHostId']: + hostresults.add(hostid) + header += 'Using ASN %s ' % asn + return (header, hostresults) + + +def do_geoip(kwargs, cache, clientCountry, header): + hostresults = set() + if clientCountry is not None and clientCountry in cache['byCountry']: + hostresults = cache['byCountry'][clientCountry] + header += 'country = %s ' % clientCountry + hostresults = trim_by_client_country(hostresults, clientCountry) + return (header, hostresults) + + +def do_location(kwargs, header): + hostresults = set() + if 'location' in kwargs and kwargs['location'] in location_cache: + hostresults = set(location_cache[kwargs['location']]) + header += "Using location %s " % kwargs['location'] + return (header, hostresults) + + +def append_path(hosts, cache, file, pathIsDirectory=False): + """ given a list of hosts, return a list of objects: + [(hostid, [hcurls]), ... ] + in the same order, appending file if it's not None""" + subpath = None + results = [] + if 'subpath' in cache: + subpath = cache['subpath'] + for hostid in hosts: + hcurls = [] + for hcurl_id in cache['byHostId'][hostid]: + s = hcurl_cache[hcurl_id] + if subpath is not None: + s += "/" + subpath + if file is None and pathIsDirectory: + s += "/" + if file is not None: + if not s.endswith('/'): + s += "/" + s += file + hcurls.append(s) + results.append((hostid, hcurls)) + return results + + +def trim_to_preferred_protocols(hosts_and_urls): + """ remove all but http and ftp URLs, + and if both http and ftp are offered, + leave only http. Return [(hostid, url), ...] """ + results = [] + try_protocols = ('https', 'http', 'ftp') + for (hostid, hcurls) in hosts_and_urls: + protocols = {} + url = None + for hcurl in hcurls: + for p in try_protocols: + if hcurl.startswith(p+':'): + protocols[p] = hcurl + + for p in try_protocols: + if p in protocols: + url = protocols[p] + break + + if url is not None: + results.append((hostid, url)) + return results + + +def client_ip_to_country(ip): + clientCountry = None + if ip is None: + return None + + # lookup in the cache first + tree_results = tree_lookup( + netblock_country_tree, ip, 'country', maxResults=1) + if len(tree_results) > 0: + (prefix, clientCountry) = tree_results[0] + return clientCountry + + # attempt IPv6, then IPv6 6to4 as IPv4, then Teredo, then IPv4 + try: + if ip.version() == 6: + if gipv6 is not None: + clientCountry = gipv6.country_code_by_addr_v6( + ip.strNormal()) + if clientCountry is None: + # Try the IPv6-to-IPv4 translation schemes + for scheme in (convert_6to4_v4, convert_teredo_v4): + result = scheme(ip) + if result is not None: + ip = result + break + if ip.version() == 4 and gipv4 is not None: + clientCountry = gipv4.country_code_by_addr(ip.strNormal()) + except: + pass + return clientCountry + + +def do_mirrorlist(kwargs): + global debug + global logfile + + def return_error(kwargs, message='', returncode=200): + d = dict( + returncode=returncode, + message=message, + resulttype='mirrorlist', + results=[]) + if 'metalink' in kwargs and kwargs['metalink']: + d['resulttype'] = 'metalink' + d['results'] = metalink_failuredoc(message) + return d + + if not (kwargs.has_key('repo') \ + and kwargs.has_key('arch')) \ + and not kwargs.has_key('path'): + return return_error( + kwargs, + message='# either path=, or repo= and arch= must be specified') + + file = None + cache = None + pathIsDirectory = False + if kwargs.has_key('path'): + path = kwargs['path'].strip('/') + + # Strip duplicate "//" from the path + path = path.replace('//', '/') + + header = "# path = %s " % (path) + + sdir = path.split('/') + try: + # path was to a directory + cache = mirrorlist_cache['/'.join(sdir)] + pathIsDirectory=True + except KeyError: + # path was to a file, try its directory + file = sdir[-1] + sdir = sdir[:-1] + try: + cache = mirrorlist_cache['/'.join(sdir)] + except KeyError: + return return_error( + kwargs, message=header + 'error: invalid path') + dir = '/'.join(sdir) + else: + if u'source' in kwargs['repo']: + kwargs['arch'] = u'source' + repo = repo_redirect.get(kwargs['repo'], kwargs['repo']) + arch = kwargs['arch'] + header = "# repo = %s arch = %s " % (repo, arch) + + if repo in disabled_repositories: + return return_error(kwargs, message=header + 'repo disabled') + try: + dir = repo_arch_to_directoryname[(repo, arch)] + if 'metalink' in kwargs and kwargs['metalink']: + dir += '/repodata' + file = 'repomd.xml' + else: + pathIsDirectory=True + cache = mirrorlist_cache[dir] + except KeyError: + repos = repo_arch_to_directoryname.keys() + repos.sort() + repo_information = header + "error: invalid repo or arch\n" + repo_information += "# following repositories are available:\n" + for i in repos: + if i[0] is not None and i[1] is not None: + repo_information += "# repo=%s&arch=%s\n" % i + return return_error(kwargs, message=repo_information) + + # set kwargs['IP'] exactly once + try: + kwargs['IP'] = IP(kwargs['client_ip']) + except: + kwargs['IP'] = None + + ordered_mirrorlist = cache.get( + 'ordered_mirrorlist', default_ordered_mirrorlist) + done = 0 + location_results = set() + netblock_results = set() + asn_results = set() + internet2_results = set() + country_results = set() + geoip_results = set() + continent_results = set() + global_results = set() + + header, location_results = do_location(kwargs, header) + + requested_countries = [] + if kwargs.has_key('country'): + requested_countries = uniqueify( + [c.upper() for c in kwargs['country'].split(',') ]) + + # if they specify a country, don't use netblocks or ASN + if not 'country' in kwargs: + header, netblock_results = do_netblocks(kwargs, cache, header) + if len(netblock_results) > 0: + if not ordered_mirrorlist: + done=1 + + if not done: + header, asn_results = do_asn(kwargs, cache, header) + if len(asn_results) + len(netblock_results) >= 3: + if not ordered_mirrorlist: + done = 1 + + clientCountry = client_ip_to_country(kwargs['IP']) + + if clientCountry is None: + print_client_country = "N/A" + else: + print_client_country = clientCountry + + if debug and kwargs.has_key('repo') and kwargs.has_key('arch'): + msg = "IP: %s; DATE: %s; COUNTRY: %s; REPO: %s; ARCH: %s\n" % ( + (kwargs['IP'] or 'None'), time.strftime("%Y-%m-%d"), + print_client_country, kwargs['repo'], kwargs['arch']) + + sys.stdout.write(msg) + sys.stdout.flush() + + if logfile is not None: + logfile.write(msg) + logfile.flush() + + if not done: + header, internet2_results = do_internet2( + kwargs, cache, clientCountry, header) + if len(internet2_results) + len(netblock_results) + len(asn_results) >= 3: + if not ordered_mirrorlist: + done = 1 + + if not done and 'country' in kwargs: + header, country_results = do_country( + kwargs, cache, clientCountry, requested_countries, header) + if len(country_results) == 0: + header, continent_results = do_continent( + kwargs, cache, clientCountry, requested_countries, header) + done = 1 + + if not done: + header, geoip_results = do_geoip( + kwargs, cache, clientCountry, header) + if len(geoip_results) >= 3: + if not ordered_mirrorlist: + done = 1 + + if not done: + header, continent_results = do_continent( + kwargs, cache, clientCountry, [], header) + if len(geoip_results) + len(continent_results) >= 3: + done = 1 + + if not done: + header, global_results = do_global( + kwargs, cache, clientCountry, header) + + def _random_shuffle(s): + l = list(s) + random.shuffle(l) + return l + + def _ordered_netblocks(s): + def ipy_len(t): + (prefix, hostid) = t + return IP(prefix).len() + v4_netblocks = [] + v6_netblocks = [] + for (prefix, hostid) in s: + ip = IP(prefix) + if ip.version() == 4: + v4_netblocks.append((prefix, hostid)) + elif ip.version() == 6: + v6_netblocks.append((prefix, hostid)) + # mix up the order, as sort will preserve same-key ordering + random.shuffle(v4_netblocks) + v4_netblocks.sort(key=ipy_len) + random.shuffle(v6_netblocks) + v6_netblocks.sort(key=ipy_len) + v4_netblocks = [t[1] for t in v4_netblocks] + v6_netblocks = [t[1] for t in v6_netblocks] + return v6_netblocks + v4_netblocks + + def whereismymirror(result_sets): + return_string = 'None' + allhosts = [] + found = False + for (l,s,f) in result_sets: + if len(l) > 0: + allhosts.extend(f(l)) + if not found: + return_string = s + found = True + + allhosts = uniqueify(allhosts) + return allhosts, return_string + + result_sets = [ + (location_results, "location", _random_shuffle), + (netblock_results, "netblocks", _ordered_netblocks), + (asn_results, "asn", _random_shuffle), + (internet2_results, "I2", _random_shuffle), + (country_results, "country", shuffle), + (geoip_results, "geoip", shuffle), + (continent_results, "continent", shuffle), + (global_results, "global", shuffle), + ] + + allhosts, where_string = whereismymirror(result_sets) + try: + ip_str = kwargs['IP'].strNormal() + except: + ip_str = 'Unknown IP' + log_string = "mirrorlist: %s found its best mirror from %s" % ( + ip_str, where_string) + syslogger.info(log_string) + + hosts_and_urls = append_path( + allhosts, cache, file, pathIsDirectory=pathIsDirectory) + + if 'metalink' in kwargs and kwargs['metalink']: + (resulttype, returncode, results)=metalink( + cache, dir, file, hosts_and_urls) + d = dict( + message=None, + resulttype=resulttype, + returncode=returncode, + results=results) + return d + + else: + host_url_list = trim_to_preferred_protocols(hosts_and_urls) + d = dict( + message=header, + resulttype='mirrorlist', + returncode=200, + results=host_url_list) + return d + + +def setup_cache_tree(cache, field): + tree = radix.Radix() + for k, v in cache.iteritems(): + node = tree.add(k.strNormal()) + node.data[field] = v + return tree + + +def setup_netblocks(netblocks_file, asns_wanted=None): + tree = radix.Radix() + if netblocks_file is not None: + try: + f = open(netblocks_file, 'r') + except: + return tree + for l in f: + try: + s = l.split() + start, mask = s[0].split('/') + mask = int(mask) + if mask == 0: continue + asn = int(s[1]) + if asns_wanted is None or asn in asns_wanted: + node = tree.add(s[0]) + node.data['asn'] = asn + except: + pass + f.close() + return tree + + +def read_caches(): + global mirrorlist_cache + global host_netblock_cache + global host_country_allowed_cache + global host_max_connections_cache + global repo_arch_to_directoryname + global repo_redirect + global country_continent_redirect_cache + global disabled_repositories + global host_bandwidth_cache + global host_country_cache + global file_details_cache + global hcurl_cache + global asn_host_cache + global location_cache + global netblock_country_cache + + data = {} + try: + f = open(cachefile, 'r') + data = pickle.load(f) + f.close() + except: + pass + + if 'mirrorlist_cache' in data: + mirrorlist_cache = data['mirrorlist_cache'] + if 'host_netblock_cache' in data: + host_netblock_cache = data['host_netblock_cache'] + if 'host_country_allowed_cache' in data: + host_country_allowed_cache = data['host_country_allowed_cache'] + if 'repo_arch_to_directoryname' in data: + repo_arch_to_directoryname = data['repo_arch_to_directoryname'] + if 'repo_redirect_cache' in data: + repo_redirect = data['repo_redirect_cache'] + if 'country_continent_redirect_cache' in data: + country_continent_redirect_cache = data[ + 'country_continent_redirect_cache'] + if 'disabled_repositories' in data: + disabled_repositories = data['disabled_repositories'] + if 'host_bandwidth_cache' in data: + host_bandwidth_cache = data['host_bandwidth_cache'] + if 'host_country_cache' in data: + host_country_cache = data['host_country_cache'] + if 'file_details_cache' in data: + file_details_cache = data['file_details_cache'] + if 'hcurl_cache' in data: + hcurl_cache = data['hcurl_cache'] + if 'asn_host_cache' in data: + asn_host_cache = data['asn_host_cache'] + if 'location_cache' in data: + location_cache = data['location_cache'] + if 'netblock_country_cache' in data: + netblock_country_cache = data['netblock_country_cache'] + if 'host_max_connections_cache' in data: + host_max_connections_cache = data['host_max_connections_cache'] + + setup_continents() + global internet2_tree + global global_tree + global host_netblocks_tree + global netblock_country_tree + + internet2_tree = setup_netblocks(internet2_netblocks_file) + global_tree = setup_netblocks(global_netblocks_file, asn_host_cache) + # host_netblocks_tree key is a netblock, value is a list of host IDs + host_netblocks_tree = setup_cache_tree(host_netblock_cache, 'hosts') + # netblock_country_tree key is a netblock, value is a single country string + netblock_country_tree = setup_cache_tree( + netblock_country_cache, 'country') + + +def errordoc(metalink, message): + if metalink: + doc = metalink_failuredoc(message) + else: + doc = message + return doc + + +class MirrorlistHandler(StreamRequestHandler): + def handle(self): + signal.signal(signal.SIGHUP, signal.SIG_IGN) + random.seed() + try: + # read size of incoming pickle + readlen = 0 + size = '' + while readlen < 10: + size += self.rfile.read(10 - readlen) + readlen = len(size) + size = atoi(size) + + # read the pickle + readlen = 0 + p = '' + while readlen < size: + p += self.rfile.read(size - readlen) + readlen = len(p) + d = pickle.loads(p) + self.connection.shutdown(socket.SHUT_RD) + except: + pass + + try: + try: + r = do_mirrorlist(d) + except: + raise + message = r['message'] + results = r['results'] + resulttype = r['resulttype'] + returncode = r['returncode'] + except Exception, e: + message=u'# Bad Request %s\n# %s' % (e, d) + exception_msg = traceback.format_exc(e) + sys.stderr.write(message+'\n') + sys.stderr.write(exception_msg) + sys.stderr.flush() + returncode = 400 + results = [] + resulttype = 'mirrorlist' + if d['metalink']: + resulttype = 'metalink' + results = errordoc(d['metalink'], message) + + try: + p = pickle.dumps({ + 'message':message, + 'resulttype':resulttype, + 'results':results, + 'returncode':returncode}) + self.connection.sendall(zfill('%s' % len(p), 10)) + + self.connection.sendall(p) + self.connection.shutdown(socket.SHUT_WR) + except: + pass + + +def sighup_handler(signum, frame): + global logfile + if logfile is not None: + name = logfile.name + logfile.close() + logfile = open(name, 'a') + + # put this in a separate thread so it doesn't block clients + if threading.active_count() < 2: + thread = threading.Thread(target=load_databases_and_caches) + thread.daemon = False + try: + thread.start() + except KeyError: + # bug fix for handing an exception when unable to delete from + #_limbo even though it's not in limbo + # https://code.google.com/p/googleappengine/source/browse/trunk/python/google/appengine/dist27/threading.py?r=327 + pass + + +def sigterm_handler(signum, frame): + global must_die + signal.signal(signal.SIGHUP, signal.SIG_IGN) + signal.signal(signal.SIGTERM, signal.SIG_IGN) + if signum == signal.SIGTERM: + must_die = True + + +class ForkingUnixStreamServer(ForkingMixIn, UnixStreamServer): + request_queue_size = 300 + def finish_request(self, request, client_address): + signal.signal(signal.SIGHUP, signal.SIG_IGN) + BaseServer.finish_request(self, request, client_address) + + +def parse_args(): + global cachefile + global socketfile + global internet2_netblocks_file + global global_netblocks_file + global debug + global logfile + global pidfile + opts, args = getopt.getopt( + sys.argv[1:], "c:i:g:p:s:dl:", + [ + "cache", "internet2_netblocks", "global_netblocks", + "pidfile", "socket", "debug", "log=" + ] + ) + for option, argument in opts: + if option in ("-c", "--cache"): + cachefile = argument + if option in ("-i", "--internet2_netblocks"): + internet2_netblocks_file = argument + if option in ("-g", "--global_netblocks"): + global_netblocks_file = argument + if option in ("-s", "--socket"): + socketfile = argument + if option in ("-p", "--pidfile"): + pidfile = argument + if option in ("-l", "--log"): + try: + logfile = open(argument, 'a') + except: + logfile = None + if option in ("-d", "--debug"): + debug = True + + +def open_geoip_databases(): + global gipv4 + global gipv6 + try: + gipv4 = GeoIP.open( + "/usr/share/GeoIP/GeoIP.dat", GeoIP.GEOIP_STANDARD) + except: + gipv4=None + try: + gipv6 = GeoIP.open( + "/usr/share/GeoIP/GeoIPv6.dat", GeoIP.GEOIP_STANDARD) + except: + gipv6=None + + +def convert_6to4_v4(ip): + all_6to4 = IP('2002::/16') + if ip.version() != 6 or ip not in all_6to4: + return None + parts=ip.strNormal().split(':') + + ab = int(parts[1],16) + a = (ab >> 8) & 0xFF + b = ab & 0xFF + cd = int(parts[2],16) + c = (cd >> 8) & 0xFF + d = cd & 0xFF + + v4addr = '%d.%d.%d.%d' % (a,b,c,d) + return IP(v4addr) + + +def convert_teredo_v4(ip): + teredo_std = IP('2001::/32') + teredo_xp = IP('3FFE:831F::/32') + if ip.version() != 6 or (ip not in teredo_std and ip not in teredo_xp): + return None + parts=ip.strNormal().split(':') + + ab = int(parts[6],16) + a = ((ab >> 8) & 0xFF) ^ 0xFF + b = (ab & 0xFF) ^ 0xFF + cd = int(parts[7],16) + c = ((cd >> 8) & 0xFF) ^ 0xFF + d = (cd & 0xFF) ^ 0xFF + + v4addr = '%d.%d.%d.%d' % (a,b,c,d) + return IP(v4addr) + + +def load_databases_and_caches(*args, **kwargs): + sys.stderr.write("load_databases_and_caches...") + sys.stderr.flush() + open_geoip_databases() + read_caches() + sys.stderr.write("done.\n") + sys.stderr.flush() + + +def remove_pidfile(pidfile): + os.unlink(pidfile) + + +def create_pidfile_dir(pidfile): + piddir = os.path.dirname(pidfile) + if not piddir: + return + try: + os.makedirs(piddir, mode=0755) + except OSError, err: + if err.errno == 17: # File exists + pass + else: + raise + except: + raise + + +def write_pidfile(pidfile, pid): + create_pidfile_dir(pidfile) + f = open(pidfile, 'w') + f.write(str(pid)+'\n') + f.close() + return 0 + + +def manage_pidfile(pidfile): + """returns 1 if another process is running that is named in pidfile, + otherwise creates/writes pidfile and returns 0.""" + pid = os.getpid() + try: + f = open(pidfile, 'r') + except IOError, err: + if err.errno == 2: # No such file or directory + return write_pidfile(pidfile, pid) + return 1 + + oldpid=f.read() + f.close() + + # is the oldpid process still running? + try: + os.kill(int(oldpid), 0) + except ValueError: # malformed oldpid + return write_pidfile(pidfile, pid) + except OSError, err: + if err.errno == 3: # No such process + return write_pidfile(pidfile, pid) + return 1 + + +def main(): + global logfile + global pidfile + signal.signal(signal.SIGHUP, signal.SIG_IGN) + parse_args() + manage_pidfile(pidfile) + + oldumask = os.umask(0) + try: + os.unlink(socketfile) + except: + pass + + load_databases_and_caches() + signal.signal(signal.SIGHUP, sighup_handler) + # restart interrupted syscalls like select + signal.siginterrupt(signal.SIGHUP, False) + ss = ForkingUnixStreamServer(socketfile, MirrorlistHandler) + + while not must_die: + try: + ss.serve_forever() + except select.error: + pass + + try: + os.unlink(socketfile) + except: + pass + + if logfile is not None: + try: + logfile.close() + except: + pass + + remove_pidfile(pidfile) + return 0 + + +if __name__ == "__main__": + try: + sys.exit(main()) + except KeyboardInterrupt: + sys.exit(-1)
Attachment:
pgpxvNNeJA0oV.pgp
Description: PGP signature
_______________________________________________ infrastructure mailing list infrastructure@xxxxxxxxxxxxxxxxxxxxxxx http://lists.fedoraproject.org/admin/infrastructure@xxxxxxxxxxxxxxxxxxxxxxx