#!/usr/bin/env python # -*- coding: utf-8 -*- # Copyright (C) 2004, Gustavo Sverzut Barbieri # # This program is free software; you can redistribute it and/or modify it under # the terms of the GNU General Public License as published by the Free Software # Foundation; either version 2 of the License, or (at your option) any later # version. # # This program is distributed in the hope that it will be useful, but WITHOUT # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS # FOR A PARTICULAR PURPOSE. See the GNU General Public License for more # details. # # You should have received a copy of the GNU General Public License along with # this program; if not, write to the Free Software Foundation, Inc., 59 Temple # Place, Suite 330, Boston, MA 02111-1307 USA # # NOTES: # * GMail functionality from Adrian Holovaty (http://www.holovaty.com) # * KAddressBook format and csv support on version 0.4 by # Andre Luiz Carvalho (alcarvalho@gmail.com) # # CHANGELOG: # 0.1 - Initial Release # 0.2 - * Changed lang to format. Yahoo CSV is constant through languages, # the lang is so translated in format, allowing to support multiple # CSV formats. # * Better handling of exceptions. # * Bad login detection. # * Support for non-ASCII chars (google uses utf-8!) # 0.3 - * KAddressBook support # * Better handling of unicode on ASCII terminals # 0.4 - * Better handling of csv files, through csv class # * Fixed bug for address field of KAddressBook # * Handling exception CSVEmpty at the console interface # 0.5 - * Orkut CSV support # * Added encoding map to csv # * Removed web encoding selection, improved guessing # 0.6 - * Outlook support (just Name and Email) # # TODO: # * Detect format by header # * Possible way to map non previously mapped CSV on demand (web interface) DEBUG = 0 defweblocale = "pt_BR.UTF-8" from Cookie import SimpleCookie import time, urllib, urllib2, re, random, sys, os, getpass, socket, csv import locale __version__ = "0.6" __date__ = "2004-06-30" __author__ = "Gustavo Sverzut Barbieri (barbieri@gmail.com)" socket.setdefaulttimeout( 30 ) LC_ALL = locale.setlocale( locale.LC_ALL, "" ) ## # format_map format_map = { "yahoo": "Yahoo!", "kaddressbook": "KDE Address Book", "orkut": "Orkut", "outlook-small": "Outlook (just Name & Email)", } ## # csv_map: # Attribute index in the CVS file csv_map = { "yahoo": ( u"firstname", u"middlename", u"lastname", u"nickname", u"email", u"category", u"lists", u"yahooid", u"phonehome", u"phonework", u"pager", u"fax", u"phonemobile", u"phoneother", u"phoneyahoo", u"main", u"email1", u"email2", u"homepage", u"workpage", u"role", u"company", u"workaddress", u"workcity", u"workstate", u"workzip", u"workcountry", u"homeaddress", u"homecity", u"homestate", u"homezip", u"homecountry", u"birthday", u"anniversary", u"extra1", u"extra2", u"extra3", u"extra4", u"comments" ), "kaddressbook": ( u"formattedname", u"lastname", u"firstname", u"middlename", u"honorprefix", u"honorsufix", u"nickname", u"birthday", u"homeaddress", u"homecity", u"homestate", u"homezip", u"homecountry", u"homelabel", u"workaddress", u"workcity", u"workstate", u"workzip", u"workcountry", u"worklabel", u"phonehome", u"phonework", u"phonemobile", u"fax", u"workfax", u"phoneother", u"ISDN", u"pager", u"email", u"mailclient", u"title", u"role", u"company", u"comments", u"homepage", u"department", u"profession", u"assistantsname", u"managersname", u"spousesname", u"office", u"imaddress", u"anniversary" ), "orkut": ( u'firstname', u'lastname', u'email' ), "outlook-small": ( u'firstname', u'email' ) } csv_defencoding_map = { "kaddressbook": None, "yahoo": "latin-1", "orkut": "utf-8", "outlook-small": "latin-1", } class csv_dialect_rn( csv.excel ): pass # csv_dialect_rn class csv_dialect_n( csv.excel ): lineterminator = '\n' # csv_dialect_n ## # CSV Dialect map csv_dialect_map = { "kaddressbook": csv_dialect_n(), "yahoo": csv_dialect_n(), "orkut": csv_dialect_rn(), "outlook-small": csv_dialect_rn(), } # Check if I forgot to map something if len( csv_dialect_map ) != len( csv_map ) \ or len( csv_map ) != len( csv_defencoding_map ) \ or len( csv_map ) != len( format_map ): raise Exception( "Must fix maps csv_dialect_map, csv_map or " \ "csv_defencoding_map!" ) class ContactAttrList: """ List attribute used in Contact class. """ value = None def __init__( self, value=None ): self.value = value or [] # __init__() def set( self, value ): """ Set list value. Value should be a unicode object representing the list, comma separated. """ if not isinstance( value, unicode ): raise TypeError( "ContactAttrList.set() " \ "should receive a string, comma separated." ) self.value = value.split(",") # set() def __str__( self ): return ",".join( self.value ) # __str__() # ContactAttrList class Contact: """ Holds contact information. """ firstname = None middlename = None lastname = None nickname = None email = None category = None lists = None yahooid = None phonehome = None phonework = None pager = None fax = None phonemobile = None phoneother = None phoneyahoo = None main = None email1 = None email2 = None homepage = None workpage = None role = None company = None workaddress = None workcity = None workstate = None workzip = None workcountry = None homeaddress = None homecity = None homestate = None homezip = None homecountry = None birthday = None anniversary = None extra1 = None extra2 = None extra3 = None extra4 = None comments = None #KAddressBook ISDN = None assistantsname = None department = None formattedname = None homelabel = None honorprefix = None honorsufix = None imaddress = None mailclient = None managersname = None office = None profession = None spousesname = None title = None workfax = None worklabel = None def __init__( self ): self.lists = ContactAttrList() # __init__() def setAttribute( self, attr, value ): """ Set attribute value, calling the attribute set() method if available. This allow special attributes to have it's own value checking. attr should be the name of the attribute (unicode) or the attribute object itself. """ if attr and \ isinstance( attr, unicode ) and \ hasattr( self, attr ): a = getattr( self, attr ) else: a = attr if hasattr( a, "set" ): a.set( value ) else: setattr( self, attr, value ) # setAttribute() def __str__( self ): s = u"Contact:\n" for i in dir( self ): if i.startswith( "__" ) or i == "setAttribute": continue else: v = getattr( self, i ) if v != None: s += u"\t%s: \"%s\"\n" % ( i, v ) else: s += u"\t%s: \n" % ( i ) return s # __str__() # Contact class CSVWrongFormat( Exception ): pass class CSVEmpty( Exception ): pass class CSV: """ CSV reader and processer. Knows how to build a list of contacts. """ contacts = None file = None format = None encoding = None def __init__( self, file=None, format=None, encoding=None, contacts=None ): if file: self.setFile( file, format, encoding ) self.contacts = contacts or [] # __init__() def setFile( self, file, format, encoding=None ): """ Set CSV specs. """ self.file = file self.format = format self.encoding = encoding or self.encoding if not self.encoding: try: self.encoding = csv_defencoding_map[ self.format ] except KeyError: raise CSVWrongFormat( "Format '%s' is not supported yet!" % self.format ) if not self.encoding: e = LC_ALL.lower().split( "." ) if len( e ) > 1: self.encoding = e[ 1 ] else: self.encoding = "latin-1" # setFile() def readFile( self ): """ Read CSV file and parse it, building the contacts list. Raises CSVEmpty if there's no contact or CSVWrongFormat if the given format is not mapped yet. """ f = self.file if isinstance( f, str ) or isinstance( f, unicode ): f = file( self.file ) try: m = csv_dialect_map[ self.format ] except KeyError: raise CSVWrongFormat( "Format '%s' is not supported yet!" % self.format ) c = csv.reader( f , m ) try: map = csv_map[ self.format ] except KeyError: raise CSVWrongFormat( "Format '%s' is not supported yet!" % self.format ) c.next() line = 0 for l in c: line += 1 self.contacts += [ Contact() ] for i in range( len( l ) ): text = l[ i ].decode( self.encoding ) # decode str to unicode if i >= len( map ): print ( "Element index %d, line %d (\"%s\") not " \ " mapped for this format (%s)" ) % \ ( i, line, text, self.format ) else: self.contacts[ -1 ].setAttribute( map[ i ], text ) if not self.contacts: raise CSVEmpty( "File doesn't contain any contact!" ) # readFile() # CSV ############################################################################## # Gmail functionality # # Gmail class is a small part from GmailClient, by Adrian Holovaty # (holovaty@gmail.com). # class GmailContactCouldNotBeAdded( Exception ): pass class GmailWrongLogin( Exception ): pass class Gmail: __version__ = "0.2" __date__ = "2004-06-20" __author__ = "Adrian Holovaty (holovaty@gmail.com)" _cookies = None _contacts = None # Gmail says this when it complains FAILURE_MESSAGE = 'Your action was not successful' WRONGLOGIN_MESSAGE = 'Username and password do not match.' def __init__( self ): self._cookies = SimpleCookie() self._folder_cache, self._message_cache = {}, {} self._contacts = [] # __init__() def login( self, username, password ): epoch_secs = int( time.time() ) self._cookies[ "GMAIL_LOGIN" ] = "T%s/%s/%s" % \ ( epoch_secs-2, epoch_secs-1, epoch_secs ) p = self._get_page( "https://www.google.com/accounts/ServiceLoginBoxAuth", ( "continue=" \ "https://gmail.google.com/gmail&service=mail&" \ "Email=%s&Passwd=%s&submit=null" ) % ( username, password ) ) c = p.read() p.close() r = re.search( 'var cookieVal\s*=\s*"([^"]+)"', c ) if not r or c.find( self.WRONGLOGIN_MESSAGE ) > -1: raise GmailWrongLogin, "Wrong username or password." self._cookies['GV'] = r.groups()[ 0 ] p = self._get_page( "https://www.google.com/accounts/CheckCookie?" \ "continue=" \ "http%3A%2F%2Fgmail.google.com%2Fgmail" \ "&service=mail&chtml=LoginDoneHtml" ) p = self._get_page( "http://www.google.com/" ) p.close() p = self._get_page( "http://gmail.google.com/gmail?view=page&name=js" ) c = p.read() p.close() r = re.search( "var js_version\s*=\s*'([^']+)'", c ) if not r: raise GmailWrongLogin, "Gmail might have redesigned." self._js_version = r.groups()[ 0 ] # login() def _get_page( self, url, post_data=None ): """ Helper method that gets the given URL, handling the sending and storing of cookies. Returns the requested page as a file-like object in the format returned by urllib2.urlopen(). Raises urllib2.HTTPError on connection failure. Note: Added url, header and post_data to HTTPError. """ if isinstance( url, unicode ): url = url.encode( "utf-8" ) if isinstance( post_data, unicode ): post_data = post_data.encode( "utf-8" ) req = urllib2.Request( url ) if post_data: req.add_data( post_data ) ch = self._cookies.output( attrs=[], header='' ).strip() if isinstance( ch, unicode ): ch = ch.encode( "utf-8" ) req.add_header( 'Cookie', ch ) req.add_header( 'Charset', 'utf-8' ) if DEBUG: print ( "Get Page: %s\nPostData:\n%s\nHeaders:\n%s\n" + \ "-" * 79 ) % ( url, post_data, req.headers ) try: f = urllib2.urlopen( req ) # Throws HTTPError! except urllib2.HTTPError, e: e.url = url e.post_data = post_data e.header = req.headers if DEBUG: print "-" * 79 print "Request failed! HTTP Error %s: %s" % ( e.code, e.msg ) raise e if f.headers.dict.has_key( 'set-cookie' ): self._cookies.load( f.headers.dict[ 'set-cookie' ] ) if DEBUG: print "-" * 79 print "Success!" return f # _get_page() def add_contact(self, name, email, notes='' ): """ Adds a contact with the given name, e-mail and notes to this Gmail account's address book. Raises ContactCouldNotBeDeleted on error. """ # google uses UTF-8. Need to convert before urllib.quote()! if isinstance( name, unicode ): name = name.encode( "utf-8" ) if isinstance( email, unicode ): email = email.encode( "utf-8" ) if isinstance( notes, unicode ): notes = notes.encode( "utf-8" ) p = self._get_page( "https://gmail.google.com/gmail?view=address&act=a", ( "at=%s&name=%s&email=%s¬es=%s&ac=Add+Contact&" \ "operation=Edit" ) % \ ( self._cookies[ "GMAIL_AT"].value, urllib.quote( name ), urllib.quote( email ), urllib.quote( notes ) ) ) if p.read().find( self.FAILURE_MESSAGE ) > -1: raise GmailContactCouldNotBeAdded, "Gmail might have redesigned." p.close() # add_contact() def get_contacts(self, clear_cache=False): """ Returns a list of lists representing all the contacts for this Gmail account, in the format ['email', 'contact name']. """ if clear_cache or not self._contacts: p = self._get_page( ( "https://gmail.google.com/gmail?view=page&name=contacts" \ "&zx=%s%s" ) % \ ( self._js_version, random.randint( 0, 1000000000 ) ) ) # security risk!!! self._contacts = eval(p.read()) p.close() return self._contacts # get_contacts() # Gmail def process_contacts( contacts ): export_list = [] skip_list = [] for i in contacts: name = [ i.firstname, i.middlename, i.lastname ] name = [ n for n in name if n ] name = u" ".join( name ).strip() email = i.email or i.email1 or i.email2 if email: if not name: name = email.split( "@" )[ 0 ] name = name.replace( "_", " " ) name = name.replace( "-", " " ) name = name.replace( ".", " " ) name = name.replace( " ", " " ) name = name.title() export_list += [ ( name, email ) ] elif name: skip_list += [ name ] return ( export_list, skip_list ) # process_contacts() ############################################################################### ## Text User Interface (Command Line) ############################################################################### out_enc = "ascii" e = LC_ALL.lower().split( '.' ) if len( e ) > 1: out_enc = e[ 1 ] def p( m ): if isinstance( m, unicode ): m = m.encode( out_enc, "replace" ) print m # p() def ui_text( argv ): pname = argv[ 0 ] fname = None format = None encoding = None username = None if len( argv ) > 3: username = argv[ 1 ] fname = argv[ 2 ] format = argv[ 3 ] else: print "%s v%s by %s." % ( pname, __version__, __author__ ) print "Usage:" print ( "\t%s " \ " [encoding]" ) % pname print "\nExample:" print ( "\t%s myaccount Yahoo.csv " \ "yahoo iso-8859-1" ) % pname print sys.exit( 1 ) if len( argv ) > 4: encoding = argv[ 4 ] c = CSV( fname, format, encoding ) p( u"Parsing CSV file: %s, format: %s, encoding: %s..." % \ ( c.file, c.format, c.encoding ) ) try: c.readFile() except CSVWrongFormat, e: p( u"ERROR: %s" % e ) return 1 except CSVEmpty, e: p( u"ERROR: %s" % e ) return 1 export_list, skip_list = process_contacts( c.contacts ) for name in skip_list: p( u"WARN: \"%s\" skipped. No email found!" % name ) gmail = Gmail() password = getpass.getpass( "Gmail password for %s: " % username ) p( u"Logging in %s@gmail.com..." % username ) cont = 0 while not cont: try: gmail.login( username, password ) cont = 1 except GmailWrongLogin, e: p( u"ERROR: Wrong Login: %s" % e ) sys.exit( 1 ) except urllib2.HTTPError, e: p( u"WARN: HTTP Error %s: %s" % ( e.code, e.msg ) ) if DEBUG: print ">>>> URL: %s" % e.url print ">>>> Headers: %s" % e.header print ">>>> Post Data:\n%s\n\n" % e.post_data except KeyboardInterrupt: p( u"INFO: Keyboard interruption... process aborted." ) sys.exit( 1 ) except Exception, e: p( u"WARN: %s" % e ) p( u"INFO: Trying login again..." ) p( u"Getting existing gmail contacts..." ) cont = 0 while not cont: try: gcontacts = gmail.get_contacts() cont = 1 except Exception, e: p( u"WARN: %s" % e ) p( u"INFO: Trying to get contacts again..." ) gcontacts = [ c[0] for c in gcontacts ] ol = export_list export_list = [] for i in ol: if i[ 1 ] not in gcontacts: export_list += [ i ] else: p( u"INFO: \"%s\" <%s> skipped. Already there!" % i ) p( u"Exporting:" ) for i in export_list: p( u' "%s" <%s>' % i ) cont = 0 while not cont: try: gmail.add_contact( i[0], i[1] ) #p( u"DUMMY: add_contact( %s, %s )\n" % i ) cont = 1 except GmailContactCouldNotBeAdded, e: p( u"ERROR: Server rejected contact \"%s\" <%s>" % i ) cont = 1 except Exception, e: p( u"WARN: %s" % e ) p( u"INFO: Trying to add contact again..." ) # ui_text() ############################################################################### ## Web User Interface (CGI ############################################################################### def ui_web_head(): print "Content-Type: text/html; charset=utf-8\n" print """ Migrate CSV Contacts to Gmail """ # ui_web_head() def ui_web_foot(): print "
%s version %s, by %s.
\n" % \ ( os.path.basename( sys.argv[ 0 ] ), __version__, __author__ ) print """ """ # ui_web_foot() def ui_web_form(): fname = os.path.basename( sys.argv[ 0 ] ) print "
" print """

Please fill the entire form and then click Migrate!

Gmail
CSV File
Any file named: *.csv, like Yahoo.csv, contacts.CSV, addressbook.csv

Notes:

  • This script is beta software and has no warranty! Send feedback to barbieri@gmail.com
  • Yahoo! Users should get the CSV file with:
    1. Click the tab: Address, (top-left)
    2. Click Import/Export, (top-right)
    3. Choose Yahoo! CSV, With Commas
    4. Save the file to disk, then select it in the file selector, present in this page. The default name is Yahoo.csv
  • Outlook Users should get the CSV file with:
    1. Open the Address Book
    2. In the File menu, select Export
    3. Select Other Address Book...
    4. Select Text file (comma separated values) in the file format options.
    5. Click Export, choose the location of the file to be saved, click next.
    6. Select only the fields Name and Email.
  • Orkut Users should get the CSV file with:
    1. Click Friends tab
    2. Click the download your contacts, at the end of Friends page.
    3. Save the file to disk, then select it in the file selector, present in this page. The default name is contacts.CSV
  • You can get this script source code at http://gustavo.ltc.ic.unicamp.br/~gustavo/csv2gmail.py. It is under the GNU GPL License.
""" # ui_web_form() def ui_web_error_not_full(): print """

Some fields left empty.

""" # ui_web_error_not_full() def pw( m ): """ Print message to web. """ if unicode( m ): m = m.encode( 'utf-8' ) print m # pw() def li( m, cl=None ): """ List Item """ c = "" if cl: c = " class=\"%s\"" % cl pw( u"%s" % ( c, m ) ) # li() def winfo( m ): li( u"INFO: %s" % m, "info" ) # winfo() def wwarn( m ): li( u"WARN: %s" % m, "warning" ) # wwarn() def werror( m ): li( u"ERROR: %s" % m, "error" ) # werror() def ui_web( argv ): import cgi import cgitb; cgitb.enable() form = cgi.FieldStorage() global LC_ALL ui_web_head() if LC_ALL == "C": LC_ALL = locale.setlocale( locale.LC_ALL, defweblocale ) try_encodings = ( "latin-1", "utf-8", "iso-8859-1", "iso-8859-15" ) if not form.getvalue( "form_submited", 0 ): ui_web_form() else: try: username = form[ "gmail_username" ].value password = form[ "gmail_password" ].value format = form[ "csv_format" ].value fname = form[ "csv_file" ].filename file = form[ "csv_file" ].file encoding = None print "
    " try: cont = 0 t = 0 while not cont: try: c = CSV( file, format, encoding ) li( ( u"Parsing CSV file: \"%s\", " \ u"format: \"%s\", " \ u"encoding: \"%s\"..." ) \ % ( fname, c.format, c.encoding ) ) c.readFile() cont = 1 except UnicodeDecodeError: encoding = try_encodings[ t ] t += 1 except CSVWrongFormat, e: werror( e ) return 1 export_list, skip_list = process_contacts( c.contacts ) for name in skip_list: wwarn( u"\"%s\" skipped. No email found!" % name ) gmail = Gmail() li( "Logging in %s@gmail.com..." % username ) cont = 0 while not cont: try: gmail.login( username, password ) cont = 1 except GmailWrongLogin, e: werror( u" Wrong Login: %s" % e ) return 1 except urllib2.HTTPError, e: wwarn( u"HTTP Error %s: %s" % ( e.code, e.msg ) ) except Exception, e: wwarn( e ) winfo( u"Trying login again..." ) li( "Checking existing gmail contacts..." ) cont = 0 while not cont: try: gcontacts = gmail.get_contacts() cont = 1 except Exception, e: wwarn( e ) winfo( u"Trying to get contacts again..." ) gcontacts = [ c[0] for c in gcontacts ] ol = export_list export_list = [] if ol: print "
      " for i in ol: if i[ 1 ] not in gcontacts: export_list += [ i ] else: winfo( ( u"\"%s\" <%s> skipped. " u"Already there!" )\ % i ) print "
    " if not export_list: li( "Nothing left to be exported." ) else: li( "Exporting:" ) print "
      " for i in export_list: li( u'"%s" <%s>' % i ) cont = 0 while not cont: try: #li( u"DUMMY: add_contact( %s, %s )\n" % i ) gmail.add_contact( i[0], i[1] ) cont = 1 except GmailContactCouldNotBeAdded, e: werror( u"Server rejected contact \"%s\" <%s>" \ % i ) cont = 1 except Exception, e: wwarn( e ) winfo( "Trying to add contact again..." ) print "
    " except Exception, e: werror( "Unknow exception: %s" % e ) except Exception, e: ui_web_error_not_full() print "
" ui_web_foot() # ui_web() if __name__ == "__main__": if sys.argv[ 0 ].endswith( ".cgi" ): ui_web( sys.argv ) else: try: sys.exit( ui_text( sys.argv ) ) except KeyboardInterrupt: print "Interrupted by keyboard (^C). Aborted." sys.exit( 1 )