Sophie

Sophie

distrib > Mandriva > 2010.0 > i586 > media > contrib-release > by-pkgid > 4eb999dd9ad89c721a22c0ff2a533f65 > files > 99

mms-1.1.0-0.rc9.2mdv2009.1.i586.rpm

"""
 Copyright (c) 2007 Daniel Svensson, <dsvensson@gmail.com>

 Permission is hereby granted, free of charge, to any person
 obtaining a copy of this software and associated documentation
 files (the "Software"), to deal in the Software without
 restriction, including without limitation the rights to use,
 copy, modify, merge, publish, distribute, sublicense, and/or sell
 copies of the Software, and to permit persons to whom the
 Software is furnished to do so, subject to the following
 conditions:

 The above copyright notice and this permission notice shall be
 included in all copies or substantial portions of the Software.

 THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
 EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
 OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
 NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
 HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
 WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
 FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR
 OTHER DEALINGS IN THE SOFTWARE.
"""

import re
import urllib2
import urllib
import cookielib
import os.path
import time

from xml.sax.saxutils import escape

try:
	import elementtree.ElementTree as ET
except:
	import xml.etree.ElementTree as ET

def progress(bs, bc, tot):
    global count

    current = int(float(bs*bc)/float(tot) * float(100))
    if current > count:
      count = current
      print count

class VideoStreamError(Exception):
	def __init__(self, value):
		self.value = value
	def __str__(self):
		return repr(self.value)

class PrivilegeError(Exception):
	def __init__(self):
		self.value = 'Insufficient permissions, operation aborted.'
	def __str__(self):
		return repr(self.value)

class YouTube:
	"""YouTube dataminer class."""

	categories = {1:'Arts & Animation',
	              2:'Autos & Vehicles',
	              23:'Comedy',
	              24:'Entertainment',
	              10:'Music',
	              25:'News & Blogs',
	              22:'People',
	              15:'Pets & Animals',
	              26:'Science & Technology',
	              17:'Sports',
	              19:'Travel & Places',
	              20:'Video Games'}

	def __init__(self, base_path, callback):
		self.base_path = base_path
		self.count = 0
		self.callback = callback
		

		# pattern to match youtube video session id.
		self.session_pattern = re.compile('&t=([0-9a-zA-Z-_]{32})')
		# pattern to match login status
		self.login_pattern = re.compile('Log In')

		# various urls
		self.base_url = 'http://www.youtube.com'
		self.api_url = self.base_url + '/api2_rest?method=%s&dev_id=k1jPjdICyu0&%s'
		self.feed_url = self.base_url + '/rss/global/%s.rss'
		self.stream_url = self.base_url + '/get_video?video_id=%s&t=%s'
		self.video_url = self.base_url + '/?v=%s'
		self.search_url = self.base_url + '/rss/search/%s.rss'
		self.user_url = self.base_url + '/rss/user/%s/videos.rss'
		self.confirm_url = self.base_url + '/verify_age?next_url=/watch?v=%s'
		self.ajax_url = self.base_url + '/watch_ajax'

		# should exotic characters be stripped?
		self.strip_chars = True

		# Create the data subdirectory if it doesn't exist.
		self.data_dir = os.path.join(self.base_path, 'data')
		if not os.path.exists(self.data_dir):
			os.mkdir(self.data_dir)

		# Cookie stuff
		self.cookie_file = os.path.join(self.data_dir, 'cookie.lwp')

		self.cj = cookielib.LWPCookieJar()
		if os.path.isfile(self.cookie_file):
			self.cj.load(self.cookie_file)

		# Cookie build opener, user for content pages
		opener = urllib2.build_opener(urllib2.HTTPCookieProcessor(self.cj))
		urllib2.install_opener(opener)

		# Callback related stuff
		self.report_hook = None
		self.report_udata = None
		self.filter_hook = None
		self.filter_udata = None

		self.count = 0

	def progressContent(self, bs, bc, tot):
    		current = int(float(bs*bc)/float(tot) * float(100))
    		if current > self.count:
      			self.count = current
			if self.count <= 100:
      				self.callback(self.count, "Getting content")

	def strip_exotic_chars(self, str):
		# Dump exotic characters so we don't have to watch ugly boxes
		stripped = ''.join([c for c in str if ord(c) < 256])
		if stripped != str:
			if len(stripped) != 0:
				stripped = stripped + ' '
			str = stripped + '[invalid characters]'
		return str

	def get_rss(self, url):
		data = self.retrieve(url)
		tree = ET.XML(data)

		list = []
		for node in tree.findall('channel/item'):
			title = node.find('title').text
			if self.strip_chars:
				title = self.strip_exotic_chars(title)
			id = node.find('link').text[-11:]
			list.append((title, id))

		return list

	def get_user_videos(self, user):
		"""Assemble user videos url and return a (desc, id) list."""

		url = self.user_url % user
		return self.get_rss(url)

	def get_feed(self, feed):
		"""Assemble feed url and return a (desc, id) list."""

		url = self.feed_url % feed
		return self.get_rss(url)

	def search(self, term):
		"""Assemble a search query and return a (desc, id) list."""

		friendly_term = escape(term).replace(' ', '+')
		url = self.search_url % friendly_term
		return self.get_rss(url)


	def call_method(self, method, param):
		"""Call a REST method and return the result as an ElementTree."""
		url = self.api_url % (method, param)

		data = self.retrieve(url)
		return ET.XML(data)
	

	def get_video_list(self, method, param):
		"""Return a list of (desc, id) pairs from the REST result."""

		tree = self.call_method(method, param)
		list = []
		for node in tree.findall('video_list/video'):
			title = node.find('title').text
			if self.strip_chars:
				title = self.strip_exotic_chars(title)
			id = node.find('id').text
			list.append((title, id))

		return list

	def get_user_profile(self, name):
		"""Collect user profile data from the REST call."""

		param = 'user=%s' % name
		method = 'youtube.users.get_profile'
		tree = self.call_method(method, param)
		profile = {}
		for node in tree.findall('user_profile/*'):
			profile[node.tag] = node.text

		return profile

	def get_user_favorites(self, name):
		"""Return a list of (desc, id) pairs."""

		param = 'user=%s' % name
		method = 'youtube.users.list_favorite_videos'
		return self.get_video_list(method, param)
	
	def get_user_friends(self, name, page=None, per_page=None):
		"""Return a list of friends."""

		param = 'user=%s' % name
		method = 'youtube.users.list_friends'

		tree = self.call_method(method, param)

		friends = []
		for node in tree.findall('friend_list/friend/user'):
			friends.append(node.text)

		return friends

	def get_video_details(self, id):
		"""Collect video details data from the REST call."""

		param = 'video_id=%s' % id
		method = 'youtube.videos.get_details'

		tree = self.call_method(method, param)

		details = {}
		for node in tree.findall('video_details/*'):
			details[node.tag] = node.text

		return details

	def get_videos_by_tag(self, tag,
	                      page=None, per_page=None):
		"""Return a list of (desc, id) pairs."""

		param = 'tag=%s' % tag
		method = 'youtube.videos.list_by_tag'

		return self.get_video_list(method, param)

	def get_videos_by_user(self, user,
	                       page=None, per_page=None):
		"""Return a list of (desc, id) pairs."""

		param = 'user=%s' % user
		method = 'youtube.videos.list_by_user'

		return self.get_video_list(method, param)

	def get_videos_by_related(self, tag,
	                          page=None, per_page=None):
		"""Return a list of (desc, id) pairs."""

		param = 'tag=%s' % tag
		method = 'youtube.videos.list_by_related'

		return self.get_video_list(method, param)

	def get_videos_by_playlist(self, id,
	                           page=None, per_page=None):
		"""Return a list of (desc, id) pairs."""

		param = 'id=%s' % id
		method = 'youtube.videos.list_by_playlist'

		return self.get_video_list(method, param)

	def get_videos_by_tag_and_category(self, category, tag,
	                                   page=None, per_page=None):
		"""Return a list of (desc, id) pairs."""

		param = 'category_id=%d&tag=%s' % (category, tag)
		method = 'youtube.videos.list_by_category_and_tag'

		return self.get_video_list(method, param)

	def set_filter_hook(self, hook, udata=None):
		"""Set the content filter handler."""

		self.filter_hook = hook
		self.filter_udata = udata

	def get_video_url(self, id, confirmed=False):
		"""Return a proper playback url for some YouTube id."""

		ret = None

		if not confirmed:
			# Regular video page.
			url = self.video_url % id
			data = self.retrieve(url)
		else:
			# Filtered video page.
			url = self.confirm_url % id

			next_url = self.video_url % id
			post = {'next_url': next_url,
			        'action_confirm':'Confirm'}

			data = self.retrieve(url, post)

		if data is not None:
			match = self.session_pattern.search(data)

			if match != None and len(match.groups()) == 1:
				session = match.group(1)
				ret = self.stream_url % (id, session)
			elif not confirmed:
				if not self.login_status(data):
					raise PrivilegeError()

				# With some luck this only means that the url is protected
				# by login + confirm page.
				if self.filter_hook is not None:
					# Ask the user if he wants to show the filtered content.
					if self.filter_hook(self.filter_udata):
						ret = self.get_video_url(id, confirmed=True)

		if ret is None:
			# Failed to find the video stream url, better complain.
			raise VideoStreamError(id)

		return ret

	def set_report_hook(self, func, udata=None):
		"""Set the download progress report handler."""

		self.report_hook = func
		self.report_udata = udata

	def retrieve(self, url, data=None, headers={}):
		"""Downloads an url."""
		self.count = 0

		self.callback(self.count, "Getting content")

		tmpfile = os.path.join(self.data_dir, 'data.tmp') 

		urllib.urlretrieve(url, filename=tmpfile, data=data, reporthook=self.progressContent)
		fd = open(tmpfile)
		content = fd.read()
		fd.close()

		self.callback(None, "YouTube")

		return content

	def login(self, username, password):
		"""Login with username, password and return status."""

		post = {'username':username, 
		        'password':password,
		        'current_form':'loginForm',
				'action_login':'Log+In'}
		url = 'http://www.youtube.com/login?next=/'

		data = self.retrieve(url, post)
		self.cj.save(self.cookie_file)

		return self.login_status(data)

	def login_status(self, data=None):
		"""Return True if logged in, otherwise False."""

		if data is None:
			data = self.retrieve(self.base_url)

		match = self.login_pattern.search(data)
		if match is not None:
			return False

		return True

	def user_add_favorite(self, id):
		"""Add some video id to the user favorites."""

		post = {'':'OK',
		        'action_add_favorite_playlist':'1',
		        'video_id':id,
		        'playlist_id':'',
		        'add_to_favorite':'on'}

		headers = {'Content-Type':'application/x-www-form-urlencoded'}

		data = self.retrieve(self.ajax_url, post, headers)

		root = ET.XML(data)

		node = root.find('return_code')
		if node is None or node.text != '0':
			raise PrivilegeError()

		return True


if __name__ == '__main__':
	import sys

	yt = YouTube(".")

	def report(done, size, udata):
		str = '\r%d    ' % int((done*100.0)/size)
		sys.stderr.write(str)
		sys.stderr.flush()
		if done == size:
			print '\r'
	
	def filter_confirm(udata):
		return True
	
	yt.set_report_hook(report)

	try:
		"""
		print "User Profile (sneseglarn):"
		print yt.get_user_profile('sneseglarn')
		print "------------------------------------------"
		print "User Favorite Videos (sneseglarn):"
		print yt.get_user_favorites('sneseglarn')
		print "------------------------------------------"
		print "User Friends (bungloid):"
		print yt.get_user_friends('bungloid')
		print "------------------------------------------"
		print "Video Details (NGrrPReQaOE):"
		print yt.get_video_details('NGrrPReQaOE')
		print "------------------------------------------"
		print "Videos by Tag ('blender')"
		print yt.get_videos_by_tag('blender')
		print "------------------------------------------"
		print "Videos by Tag and Category (1, 'blender')"
		print yt.get_videos_by_tag_and_category(1, 'blender')
		print "------------------------------------------"
		print "Videos from Feed ('recently_featured')"
		print yt.get_feed('recently_featured')
		print "------------------------------------------"
		"""
		print "Videos Url from Id ('whG99kjeXOM')"
		print yt.get_video_url('VW_N-qc1Ov8')
		print "------------------------------------------"
		
		print "Videos from Search ('snowboard')"
		print yt.search('snowboard')
		print "------------------------------------------"
		print "Videos from User ('sneseglarn')"
		print yt.get_user_videos('sneseglarn')
		print "------------------------------------------"

		if len(sys.argv) == 3:
			print "Login status"
			print yt.login_status()
			print "------------------------------------------"
			print "Logging in"
			print yt.login(sys.argv[1], sys.argv[2])
			print "------------------------------------------"
			print "Login status"
			print yt.login_status()
			print "------------------------------------------"
			print "Video Url from filtered Id ('M23If6Sqe-Q')"
			yt.set_filter_hook(filter_confirm)
			print yt.get_video_url('M23If6Sqe-Q')
			print "------------------------------------------"
			print "Add Video to Favorites ('M23If6Sqe-Q')"
			print yt.user_add_favorite('M23If6Sqe-Q')

	#except DownloadError, e:
	#	print "download failed: %s" % e
	#except DownloadAbort, e:
	#	print "download aborted: %s " % e
	except VideoStreamError, e:
		print "could not get video url for %s" % e
	except PrivilegeError, e:
		print "login required for this operation"