#!/usr/bin/env python # -*- coding: utf-8 -*- """ .(' /%/\\' (%(%))' curl' Before you can use curlish you need to register the site with the curlish client. For that you can use the --add-site parameter which will walk you through the process. example: $ curlish https://graph.facebook.com/me Notes on the authorization_code grant type: curlish spawns an HTTP server that handles a single request on http://127.0.0.1:62231/ which acts as a valid redirect target. If you need the authorization_code grant, let it redirect there. common curl options: -v verbose mode -i prints the headers -X METHOD specifies the method -H "Header: value" emits a header with a value -d "key=value" emits a pair of form data curl extension options: METHOD shortcut for -XMETHOD if it's one of the known HTTP methods. -J key=value transmits a JSON string value. -J key:=value transmits raw JSON data for a key (bool int etc.) """ from __future__ import with_statement import os import re import sys import cgi import webbrowser import argparse try: import json from json.encoder import JSONEncoder except ImportError: import simplejson as json from simplejson.encoder import JSONEncoder import urllib import urlparse import subprocess import base64 from copy import deepcopy from httplib import HTTPConnection, HTTPSConnection from BaseHTTPServer import HTTPServer, BaseHTTPRequestHandler from getpass import getpass from uuid import UUID def str_to_uuid(s): try: UUID(s) return s except: print "%s is not a valid UUID" % s sys.exit(1) KNOWN_HTTP_METHODS = set(['GET', 'POST', 'HEAD', 'PUT', 'OPTIONS', 'TRACE', 'DELETE', 'PATCH']) DEFAULT_SETTINGS = { 'curl_path': None, 'http_port': 62231, 'json_indent': 2, 'colors': { 'statusline_ok': 'green', 'statusline_error': 'red', 'header': 'teal', 'brace': 'teal', 'operator': None, 'constant': 'blue', 'number': 'purple', 'string': 'yellow', 'objstring': 'green' }, 'sites': { "facebook": { "extra_headers": {}, "request_token_params": { "scope": "email" }, "authorize_url": "https://www.facebook.com/dialog/oauth", "base_url": "https://graph.facebook.com/", "client_id": "384088028278656", "client_secret": "14c75a494cda2e11e8760095ec972915", "grant_type": "authorization_code", "access_token_url": "/oauth/access_token" } }, 'token_cache': {} } ANSI_CODES = { 'black': '\x1b[30m', 'blink': '\x1b[05m', 'blue': '\x1b[34m', 'bold': '\x1b[01m', 'faint': '\x1b[02m', 'green': '\x1b[32m', 'purple': '\x1b[35m', 'red': '\x1b[31m', 'reset': '\x1b[39;49;00m', 'standout': '\x1b[03m', 'teal': '\x1b[36m', 'underline': '\x1b[04m', 'white': '\x1b[37m', 'yellow': '\x1b[33m' } _list_marker = object() _value_marker = object() def decode_flat_data(pairiter): def _split_key(name): result = name.split('.') for idx, part in enumerate(result): if part.isdigit(): result[idx] = int(part) return result def _enter_container(container, key): if key not in container: return container.setdefault(key, {_list_marker: False}) return container[key] def _convert(container): if _value_marker in container: force_list = False values = container.pop(_value_marker) if container.pop(_list_marker): force_list = True values.extend(_convert(x[1]) for x in sorted(container.items())) if not force_list and len(values) == 1: values = values[0] return values elif container.pop(_list_marker): return [_convert(x[1]) for x in sorted(container.items())] return dict((k, _convert(v)) for k, v in container.iteritems()) result = {_list_marker: False} for key, value in pairiter: parts = _split_key(key) if not parts: continue container = result for part in parts: last_container = container container = _enter_container(container, part) last_container[_list_marker] = isinstance(part, (int, long)) container[_value_marker] = [value] return _convert(result) def get_color(element): user_colors = settings.values['colors'] name = user_colors.get(element) if name is None and element not in user_colors: name = DEFAULT_SETTINGS['colors'].get(element) if name is not None: return ANSI_CODES.get(name, '') return '' def isatty(): """Is stdout connected to a terminal or a file?""" if not hasattr(sys.stdout, 'isatty'): return False if not sys.stdout.isatty(): return False return True def is_color_terminal(): """Returns `True` if this terminal has colors.""" if not isatty(): return False if 'COLORTERM' in os.environ: return True term = os.environ.get('TERM', 'dumb').lower() if term in ('xterm', 'linux') or 'color' in term: return True return False def fail(message): """Fails with an error message.""" print >> sys.stderr, 'error:', message sys.exit(1) def find_url_arg(arguments): """Finds the URL argument in a curl argument list.""" for idx, arg in enumerate(arguments): if arg.startswith(('http:', 'https:')): return idx class AuthorizationHandler(BaseHTTPRequestHandler): """Callback handler for the code based authorization""" def do_GET(self): self.send_response(200, 'OK') self.send_header('Content-Type', 'text/html') self.end_headers() self.server.token_response = dict((k, v[-1]) for k, v in cgi.parse_qs(self.path.split('?')[-1]).iteritems()) if 'code' in self.server.token_response: title = 'Tokens Received' text = 'The tokens were transmitted successfully to curlish.' else: title = 'Error on Token Exchange' text = 'Could not exchange tokens :-(' self.wfile.write(''' %(title)s

%(title)s

%(text)s

You can now close this window, it's no longer needed. ''' % locals()) self.wfile.close() def log_message(self, *args, **kwargs): pass class Settings(object): """Wrapper around the settings file""" def __init__(self): if os.name == 'nt': self.filename = os.path.expandvars(r'%APPDATA%\\FireteamCurlish\\config.json') else: self.filename = os.path.expanduser(r'~/.ftcurlish.json') rv = deepcopy(DEFAULT_SETTINGS) if os.path.isfile(self.filename): with open(self.filename) as f: try: rv.update(json.load(f)) except Exception: pass if not rv['curl_path']: rv['curl_path'] = get_default_curl_path() self.values = rv def save(self): dirname = os.path.dirname(self.filename) try: os.makedirs(dirname) except OSError: pass with open(self.filename, 'w') as f: json.dump(self.values, f, indent=2) class Site(object): """Represents a single site.""" def __init__(self, name, values): def _full_url(url): if self.base_url is not None: return urlparse.urljoin(self.base_url, url) return url self.name = name self.base_url = values.get('base_url') self.grant_type = values.get('grant_type', 'authorization_code') self.access_token_url = _full_url(values.get('access_token_url')) self.authorize_url = _full_url(values.get('authorize_url')) self.client_id = values.get('client_id') self.client_secret = values.get('client_secret') self.request_token_params = values.get('request_token_params') or {} self.extra_headers = values.get('extra_headers') or {} self.bearer_transmission = values.get('bearer_transmission', 'query') self.access_token = None def make_request(self, method, url, headers=None, data=None): """Makes an HTTP request to the site.""" u = urlparse.urlparse(url) pieces = u.netloc.rsplit(':', 1) secure = u.scheme == 'https' host = pieces[0].strip('[]') if len(pieces) == 2 and pieces[-1].isdigit(): port = int(pieces[-1]) else: port = secure and 443 or 80 conncls = secure and HTTPSConnection or HTTPConnection conn = conncls(host, port) if isinstance(data, dict): data = urllib.urlencode(data) real_headers = self.extra_headers.copy() real_headers.update(headers or ()) conn.request(method, u.path, data, real_headers) resp = conn.getresponse() ct = resp.getheader('Content-Type') if ct.startswith('application/json') or ct.startswith('text/javascript'): resp_data = json.loads(resp.read()) elif ct.startswith('text/html'): fail('Invalid response from server: ' + resp.read()) else: resp_data = dict((k, v[-1]) for k, v in cgi.parse_qs(resp.read()).iteritems()) return resp.status, resp_data def get_access_token(self, params): """Tries to load tokens with the given parameters.""" data = params.copy() # Provide the credentials both as a basic authorization header as well as # the parameters in the URL. Should make everybody happy. At least I hope so. data['client_id'] = self.client_id data['client_secret'] = self.client_secret creds = self.client_id + ':' + self.client_secret headers = {'Content-Type': 'application/x-www-form-urlencoded; charset=utf-8', 'Authorization': 'Basic ' + base64.b64encode(creds)} status, data = self.make_request('POST', self.access_token_url, data=data, headers=headers) if status == 200: return data['access_token'] error = data.get('error') if error in ('invalid_grant', 'access_denied'): return None error_msg = data.get('error_description') fail("Couldn't authorize: %s - %s" % (error, error_msg)) def request_password_grant(self): while 1: params = {'grant_type': 'password'} params['username'] = raw_input('Username: ') params['password'] = getpass() params.update(self.request_token_params) rv = self.get_access_token(params) if rv is None: print 'Error: invalid credentials' continue settings.values['token_cache'][self.name] = rv return def request_authorization_code_grant(self): redirect_uri = 'http://127.0.0.1:%d/' % settings.values['http_port'] params = { 'client_id': self.client_id, 'redirect_uri': redirect_uri, 'response_type': 'code' } params.update(self.request_token_params) browser_url = '%s?%s' % ( self.authorize_url, urllib.urlencode(params) ) webbrowser.open(browser_url) server_address = ('127.0.0.1', settings.values['http_port']) httpd = HTTPServer(server_address, AuthorizationHandler) httpd.token_response = None httpd.handle_request() if 'code' in httpd.token_response: return self.exchange_code_for_token(httpd.token_response['code'], redirect_uri) print 'Could not sign in: grant cancelled' for key, value in httpd.token_response.iteritems(): print ' %s: %s' % (key, value) sys.exit(1) def exchange_code_for_token(self, code, redirect_uri): settings.values['token_cache'][self.name] = self.get_access_token({ 'code': code, 'grant_type': 'authorization_code', 'redirect_uri': redirect_uri }) def request_tokens(self): if self.grant_type == 'password': self.request_password_grant() elif self.grant_type == 'authorization_code': self.request_authorization_code_grant() else: fail('Invalid grant configured: %s' % self.grant_type) def fetch_token_if_necessarys(self): token_cache = settings.values['token_cache'] if token_cache.get(self.name) is None: self.request_tokens() self.access_token = token_cache[self.name] def get_site_by_name(name): """Finds a site by its name.""" rv = settings.values['sites'].get(name) if rv is not None: return Site(name, rv) def get_site(site_name, url_arg): """Tries to look up a site from the config or automatically.""" if site_name is not None: site = get_site_by_name(site_name) if site is not None: return site fail('Site %s does not exist' % site_name) matches = [] for name, site in settings.values['sites'].iteritems(): base_url = site.get('base_url') if base_url and url_arg.startswith(base_url): matches.append(Site(name, site)) break if len(matches) == 1: return matches[0] if len(matches) > 1: fail('Too many matches. Please specificy an application ' 'explicitly') def get_default_curl_path(): """Tries to find curl and returns the path to it.""" def tryrun(path): subprocess.call([path, '--version'], stdout=subprocess.PIPE, stdin=subprocess.PIPE) return True if tryrun('curl'): return 'curl' base = os.path.abspath(os.path.dirname(__file__)) for name in 'curl', 'curl.exe': fullpath = os.path.join(base, name) if tryrun(fullpath): return fullpath def colorize_json_stream(iterator): """Adds colors to a JSON event stream.""" for event in iterator: color = None e = event.strip() if e in '[]{}': color = get_color('brace') elif e in ',:': color = get_color('operator') elif e[:1] == '"': color = get_color('string') elif e in ('true', 'false', 'null'): color = get_color('constant') else: color = get_color('number') if color is not None: event = color + event + ANSI_CODES['reset'] yield event def print_formatted_json(json_data): """Reindents JSON and colorizes if wanted. We use our own wrapper around json.dumps because we want to inject colors and the simplejson iterator encoder does some buffering between separate events that makes it really hard to inject colors. """ if is_color_terminal(): def colorize(colorname, text): color = get_color(colorname) reset = ANSI_CODES['reset'] return color + text + reset else: colorlize = lambda x: x def _walk(obj, indentation, inline=False, w=sys.stdout.write): i = ' ' * (indentation * settings.values['json_indent']) if not inline: w(i) if isinstance(obj, basestring): w(colorize('string', json.dumps(obj))) elif isinstance(obj, (int, long, float)): w(colorize('number', json.dumps(obj))) elif obj in (True, False, None): w(colorize('constant', json.dumps(obj))) elif isinstance(obj, list): if not obj: w(colorize('brace', '[]')) else: w(colorize('brace', '[\n')) for idx, item in enumerate(obj): if idx: w(colorize('operator', ',\n')) _walk(item, indentation + 1) w(colorize('brace', '\n' + i + ']')) elif isinstance(obj, dict): if not obj: w(colorize('brace', '{}')) else: w(colorize('brace', '{\n')) for idx, (key, value) in enumerate(obj.iteritems()): if idx: w(colorize('operator', ',\n')) ki = i + ' ' * settings.values['json_indent'] w(ki + colorize('objstring', json.dumps(key))) w(colorize('operator', ': ')) _walk(value, indentation + 1, inline=True) w(i + colorize('brace', '\n' + i + '}')) else: # hmm. should not happen, but let's just assume it might # because of json changes w(json.dumps(obj)) _walk(json_data, 0) sys.stdout.write('\n') sys.stdout.flush() def beautify_curl_output(iterable, hide_headers): """Parses curl output and adds colors and reindents as necessary.""" json_body = False has_colors = is_color_terminal() # Headers for line in iterable: if has_colors and re.search(r'^HTTP/', line): if re.search('HTTP/\d+.\d+ [45]\d+', line): color = get_color('statusline_error') else: color = get_color('statusline_ok') sys.stdout.write(color + line + ANSI_CODES['reset']) continue if re.search(r'^Content-Type:\s*(text/javascript|application/(.+?\+)?json)\s*(?i)', line): json_body = True if not hide_headers: # Nicer headers if we detect them if not line.startswith(' ') and ':' in line: key, value = line.split(':', 1) else: key = None if has_colors and key is not None: sys.stdout.write(get_color('header') + key + ANSI_CODES['reset'] + ': ' + value.lstrip()) else: sys.stdout.write(line) sys.stdout.flush() if line == '\r\n': break # JSON Body. Do not reindent if we have headers and are piping # into a file because of changing content length. if json_body and (hide_headers or isatty()): data = json.loads(''.join(iterable)) print_formatted_json(data) # Regular body else: for line in iterable: sys.stdout.write(line) sys.stdout.flush() def clear_token_cache(site_name): """Delets all tokens or the token of a site.""" site = None if site_name is not None: site = get_site_by_name(site_name) if site is None: fail('Site %s does not exist' % site_name) if site is None: settings.values['token_cache'] = {} print 'Cleared the token cache' else: settings.values['token_cache'].pop(site.name, None) print 'Cleared the token cache for %s' % site.name settings.save() def add_site(site_name): """Registers a new site with the config.""" def prompt(prompt, one_of=None, default=None): if default is not None: prompt += ' [%s]' % default if one_of: prompt += ' (options=%s)' % ', '.join(sorted(one_of)) while 1: value = raw_input(prompt + ': ') if value: if one_of and value not in one_of: print 'error: invalid value' continue return value if default is not None: return default base_url = prompt('base_url') if prompt('Configure OAuth 2.0?', ['yes', 'no'], 'yes') == 'yes': grant_type = prompt('grant_type', one_of=['password', 'authorization_code'], default='authorization_code') access_token_url = prompt('access_token_url') if grant_type == 'authorization_code': authorize_url = prompt('authorize_url') client_id = prompt('client_id') client_secret = prompt('client_secret') bearer_transmission = prompt('bearer_transmission', one_of=['header', 'query'], default='query') else: grant_type = None access_token_url = None client_id = None client_secret = None bearer_transmission = None settings.values['sites'][site_name] = { 'extra_headers': {}, 'request_token_params': {}, 'base_url': base_url, 'grant_type': grant_type, 'base_url': base_url, 'access_token_url': access_token_url, 'client_id': client_id, 'client_secret': client_secret, 'grant_type': grant_type, 'bearer_transmission': bearer_transmission } settings.values['token_cache'].pop(site_name, None) settings.save() print 'Site %s added' % site_name def remove_site(site_name): """Removes a site from the config.""" try: settings.values['sites'].pop(site_name) except KeyError: fail('Site %s does not exist' % site_name) settings.save() print 'Site %s removed' % site_name def list_sites(): """Prints a list of all sites.""" print 'Registered sites:' print for name, site in sorted(settings.values['sites'].items()): print ' %s' % name for key, value in sorted(site.items()): if isinstance(value, dict): print ' %s:%s' % (key, not value and ' -' or '') for key, value in sorted(value.items()): print ' %s: %s' % (key, value) else: print ' %s: %s' % (key, value) print def add_content_type_if_missing(args, content_type): """Very basic hack that adds a content type if no content type was mentioned so far. """ was_h = False for arg in args: iarg = arg.lower() if iarg.startswith('-hcontent-type'): return elif iarg == '-h': was_h = True elif was_h: if iarg.startswith('content-type'): return was_h = False args.append('-H') args.append('Content-Type: ' + content_type) def handle_curlish_arguments(args): new_args = [] json_pairs = [] argiter = iter(args) def _get_next_arg(error): try: return argiter.next() except StopIteration: fail('Error: ' + error) def handle_json_value(value): if ':=' in value: dkey, value = value.split(':=', 1) try: value = json.loads(value) except Exception: fail('Error: invalid JSON data for "%s"' % dkey) elif '=' in value: dkey, value = value.split('=', 1) vlaue = json.dumps(value) else: fail('Error: malformed json data with -J') json_pairs.append((dkey, value)) for idx, arg in enumerate(argiter): # Automatic -X in front of known http method names if arg in KNOWN_HTTP_METHODS: new_args.append('-X' + arg) elif arg == '-J': handle_json_value(_get_next_arg('-J requires an argument')) elif arg.startswith('-J'): handle_json_value(arg[2:]) # Regular argument else: new_args.append(arg) json_data = decode_flat_data(json_pairs) need_json = bool(json_data) if len(json_data) == 1 and '' in json_data: json_data = json_data[''] if need_json: add_content_type_if_missing(new_args, 'application/json') new_args.append('--data-binary') new_args.append(json.dumps(json_data)) return new_args def invoke_curl(site, curl_path, args, url_arg): if args[0] == '--': args.pop(0) if not curl_path: fail('Could not find curl. Put it into your config') url = args[url_arg] if site is not None and site.bearer_transmission is not None: if site.bearer_transmission == 'header': args += ['-H', 'Authorization: Bearer %s' % site.access_token] elif site.bearer_transmission == 'query': url += ('?' in url and '&' or '?') + 'access_token=' + \ urllib.quote(site.access_token) else: fail('Bearer transmission %s is unknown.' % site.bearer_transmission) args[url_arg] = url if site is not None: for key, value in site.extra_headers.iteritems(): args += ['-H', '%s: %s' % (key, value)] # Force response headers hide_headers = False if not any(arg == '-i' or (arg[:1] == '-' and \ arg[1:2] != '-' and 'i' in arg) for arg in args): args.append('-i') hide_headers = True # Hide stats args.append('-s') # Handle curlish specific argument shortcuts args = handle_curlish_arguments(args) p = subprocess.Popen([curl_path] + args, stdout=subprocess.PIPE) beautify_curl_output(p.stdout, hide_headers) # Load the settings once before we start up settings = Settings() def main(): parser = argparse.ArgumentParser(description="curl, with flames on top", add_help=False) parser.add_argument('-h', '--help', action='store_true', help='Prints this help.') parser.add_argument('--site', help='The site to use. By default it will ' 'guess the site from the URL of the request.') parser.add_argument('--clear-token-cache', action='store_true', help='Clears the token cache. By default of all the ' 'sites, can be limited to one site with --site.') parser.add_argument('--add-site', help='Registers a new site with curlish.', metavar='NAME') parser.add_argument('--remove-site', help='Unregisters a site from curlish.', metavar='NAME') parser.add_argument('--list-sites', help='Lists all known sites', action='store_true') try: args, extra_args = parser.parse_known_args() except Exception as e: print e sys.exit(1) if args.help: parser.print_help() print __doc__.rstrip() return # Custom commands if args.clear_token_cache: clear_token_cache(args.site) return if args.add_site: add_site(args.add_site) return if args.remove_site: remove_site(args.remove_site) return if args.list_sites: list_sites() return # Redirect everything else to curl via the site url_arg = find_url_arg(extra_args) if url_arg is None: parser.print_usage() return site = get_site(args.site, extra_args[url_arg]) if site is not None and site.grant_type is not None: site.fetch_token_if_necessarys() settings.save() invoke_curl(site, settings.values['curl_path'], extra_args, url_arg) if __name__ == '__main__': try: main() except KeyboardInterrupt: pass