#! /usr/bin/python # canonurls.py - canonicalize and clean a list of URLs. -*- encoding: utf-8 -*- # Copyright © 2010 Zack Weinberg # Portions © 2009 Serge Broslavsky # # Copying and distribution of this program, with or without modification, # are permitted in any medium without royalty provided the copyright # notice and this notice are preserved. This program is offered as-is, # without any warranty. import sys import cookielib import httplib import urlparse options = None # Custom canonization function which treats "foo.com/blah" as equivalent to # "http://foo.com/blah" rather than as a partial URL with no host or scheme # (as urlparse.urlsplit does). def canonize(url): (scheme, sep, rest) = url.partition('://') if sep == '': rest = scheme scheme = 'http' else: scheme = scheme.lower() (host, sep, path) = rest.partition('/') if path == '': path = '/' else: path = '/' + path host = host.lower() return scheme + "://" + host + path # We have to do the HTTP queries by hand because of urllib bugs and # sites that misbehave if they see its default user-agent. This means # we have to create a shim between httplib and cookielib, because # cookielib assumes you're using urllib. Thanks to Serge Broslavsky # for this shim class: # http://stackoverflow.com/questions/1016765/how-to-use-cookielib-with-httplib-in-python class HTTPRequest(object): """ Data container for HTTP request (used for cookie processing). """ def __init__(self, url, headers={}): self._url = urlparse.urlsplit(urlparse.urldefrag(url)[0]) self._headers = { "User-Agent": "Mozilla/5.0 (Macintosh; rv:1.9.1) Gecko/20100101 Firefox/3.6", "Connection": "close" } for key, value in headers.items(): self.add_header(key, value) def has_header(self, name): return name in self._headers def add_header(self, key, val): self._headers[key.capitalize()] = val def add_unredirected_header(self, key, val): self._headers[key.capitalize()] = val def is_unverifiable(self): return True def get_type(self): return self._url.scheme def get_full_url(self): return self._url.geturl() def get_header(self, header_name, default=None): return self._headers.get(header_name.capitalize(), default) def get_host(self): return self._url.netloc get_origin_req_host = get_host def get_headers(self): return self._headers def fire(self): if self._url.scheme == 'http': conn = httplib.HTTPConnection(self._url.netloc) elif self._url.scheme == 'https': conn = httplib.HTTPSConnection(self._url.netloc) else: raise IOError("unsupported URL '%s'" % self.get_full_url()) path = self._url.path if self._url.query != "": path = path + '?' + self._url.query conn.request("GET", path, headers=self._headers) resp = conn.getresponse() # patch httplib response to look like urllib2 response, # for the sake of cookie processing resp.info = lambda: resp.msg return resp def log_status(resp): sys.stderr.write("%s %s" % (resp.status, resp.reason)) def log_cookies(jar): if options.verbose: for cookie in jar: sys.stderr.write(" %s=%s" % (cookie.name, cookie.value)) def log_start(url): if options.verbose: sys.stderr.write("%s => " % url) def log_stop(resp, url): if options.verbose: if resp.status != 200: log_status(resp) sys.stderr.write(" ") sys.stderr.write("%s\n" % url) def log_bad_redirect(resp): if options.verbose: log_status(resp) sys.stderr.write(" to nowhere\n") def log_redirect_loop(resp, url): if options.verbose: log_status(resp) sys.stderr.write("; loop detected at %s\n" % url) def log_good_redirect(resp, url, cookies): if options.verbose: log_status(resp) sys.stderr.write(" to %s" % url) log_cookies(cookies) sys.stderr.write("\n => ") def chase_redirects(url): seen = set() cookies = cookielib.CookieJar() url = canonize(url) log_start(url) while True: req = HTTPRequest(url) url = req.get_full_url() seen.add(url) cookies.add_cookie_header(req) resp = req.fire() if 200 <= resp.status < 300: # done, yay log_stop(resp, url) return url elif resp.status not in (301, 302, 303, 307): # treat any 1xx and 3xx codes that we don't understand as # hard errors, as well as 4xx/5xx log_stop(resp, url) raise IOError("%s %s" % (resp.status, resp.reason)) else: # redirected, so where to? location = resp.getheader("Location") if location is None: location = resp.getheader("Uri") if location is None: log_bad_redirect(resp) return url # pick up any cookies attached to the redirection cookies.extract_cookies(resp, req) # update the url url = urlparse.urljoin(url, location) if url in seen: log_redirect_loop(resp, url) return url log_good_redirect(resp, url, cookies) # and loop def sanitize_urls(urls): results = {} for url in urls: url = url.strip() try: resolved = chase_redirects(url) except EnvironmentError, e: if options.verbose: sys.stderr.write(" => error: %s\n" % e) continue if resolved in results: if options.verbose: sys.stderr.write(" => duplicates %s\n" % results[resolved]) else: results[resolved] = url for url in sorted(results.keys()): sys.stdout.write(url + "\n") if __name__ == '__main__': import fileinput import optparse op = optparse.OptionParser( usage="usage: %prog [options] lists ... > output", version="%prog 1.0") op.add_option("-q", "--quiet", action="store_false", dest="verbose", default=True, help="don't print progress messages to stderr") (options, args) = op.parse_args() sanitize_urls(fileinput.input(args))