Python Forum
Youtube Watched History Analyzer
Thread Rating:
  • 0 Vote(s) - 0 Average
  • 1
  • 2
  • 3
  • 4
  • 5
Youtube Watched History Analyzer
#1
Hey again! I often watch Youtube, so I made an easy script to collect interesting data from my watched history, thought it was a bit scared Shocked
I share it for you, if you want to beat my data Big Grin

Features
  • Allow several accounts history
  • Return Json data in a result dir
  • Average values of videos watched per amount of time and average video length
  • An easy to read watched_history with only interesting data
  • Most viewed channels, tags, topics...
  • Custom your results files

Installing
  • Youtube watched history data from Google
  • Your API key
  • Youtube API module $ pip install --upgrade google-api-python-client
  • Update the '# Init' value of the code ('API_key','files','results_dir')
#!/usr/bin/python
#-*- coding: utf-8 -*-

import os
import re
import json
from datetime import datetime, timedelta
from apiclient import discovery
from apiclient.discovery import build

# Init
API_key = "REPLACE_ME"
# List of your watch-history files (allow several accounts)
files = ['/Path/to/watch-history.json','/Path/to/watch-history.json']
results_dir = '/Path/to/Results_dir'
# Load only videos after this date
min_date = "01/01/00" # DD/MM/YY
# List of allowed results files
results = {'clean_history': True, 'average': True, 'channels': True, 'topics': True, 'tags': True, 'days': True, 'months': True, 'years': True}
# Average percentage of videos watched
watch_percentage = 60 # %

service = build("youtube", "v3",developerKey=API_key)
Day = ['Monday','Tuesday','Friday','Wednesday','Thursday','Sunday','Saturday']
PT_format = re.compile(r'PT((?P<hours>\d+?)hr)?((?P<minutes>\d+?)M)?((?P<seconds>\d+?)S)?')
min_date = datetime.strptime(min_date, '%d/%m/%y')
watch_percentage /= 100
date_sorter = []
videos = []
average_li = []
sorted_li = []

channel_hm = {}
topic_hm = {}
tag_hm = {}
duration_hm = {}
day_name_hm = {}
day_hm = {}
month_hm = {}
year_hm = {}

# remove keyword arguments that are not set
def remove_empty_kwargs(**kwargs):
	good_kwargs = {}
	if kwargs is not None:
		for key, value in kwargs.iteritems():
			if value:
				good_kwargs[key] = value
	return good_kwargs

# sample python code for channels.list
def videos_list_by_id(service, **kwargs):
	kwargs = remove_empty_kwargs(**kwargs)
	return service.videos().list(**kwargs).execute()

def parse_time(time_str):
    parts = PT_format.match(time_str)
    if not parts:
        return
    parts = parts.groupdict()
    time_params = {}
    for (name, param) in parts.iteritems():
        if param:
            time_params[name] = int(param)
    return timedelta(**time_params)

def clear_videos_vars():
	video_title = video_description = video_duration = video_thumbnails = video_location = video_topics = video_tags = video_date = channel_id = channel_title = None

def count_data(hm_name,key):
	try:
		hm_name[key] = {'nb':hm_name[key]['nb'] + 1}
	except:
		hm_name[key] = {'nb':1}

def average(dict):
	nb = 0
	length = 0
	for key in dict:
		nb += dict[key]['nb']
		length += 1
	return float(nb)/length

def sorted_list(hm_name):
	sorted_li = []
	for key in hm_name:
		sorted_li.append({'name':key,'nb':hm_name[key]['nb']})
	return sorted(sorted_li, key=lambda k: k['nb'], reverse=True)

# create new path if necessary
if os.path.isdir(results_dir) == False:
	os.makedirs(results_dir)

# save watching date of videos
for f in files:
	for x in json.load(open(f)):
		date = datetime.strptime(x['snippet']['publishedAt'], '%Y-%m-%dT%H:%M:%S.000Z')
		delta_date = date - min_date
		if delta_date.total_seconds() > 0:
			date_sorter.append(date)

# sort videos per watching date
print(str(len(date_sorter)) + " videos detected" )
date_sorter.sort(reverse=True)

for f in files:
	for x in json.load(open(f)):
		date = datetime.strptime(x['snippet']['publishedAt'], '%Y-%m-%dT%H:%M:%S.000Z')
		delta_date = date - min_date
		if delta_date.total_seconds() < 0:
			continue
		
		# define video position
		pos = 0
		for sorted_date in date_sorter:
			if sorted_date == date:
				break
			else:
				pos += 1

		# access to video data
		if x['snippet']['title'] != "Deleted video" and x['snippet']['title'] != "Private video":
			try:
				video_data = videos_list_by_id(service,part='snippet,contentDetails,topicDetails,recordingDetails',id=x['contentDetails']['videoId'])
				try:
					video_location = (video_data['items'][0]['recordingDetails']['location']['latitude'],video_data['items'][0]['recordingDetails']['location']['longitude'])
				except KeyError:
					video_location = None
				try:
					video_topics = video_data['items'][0]['topicDetails']['topicCategories']
				except KeyError:
					video_topics = []
				try:
					video_tags = video_data['items'][0]['snippet']['tags']
				except KeyError:
					video_tags = []

				video_title = video_data['items'][0]['snippet']['title']
				video_description = video_data['items'][0]['snippet']['description']
				video_duration = parse_time(video_data['items'][0]['contentDetails']['duration'])
				video_thumbnails = video_data['items'][0]['snippet']['thumbnails']['default']['url']
				video_date = datetime.strptime(video_data['items'][0]['snippet']['publishedAt'],'%Y-%m-%dT%H:%M:%S.000Z').strftime('%d/%m/%y %H:%M')
				
				channel_id = video_data['items'][0]['snippet']['channelId']
				channel_title = video_data['items'][0]['snippet']['channelTitle']
				
				# save video data in dicts
				for topic in video_topics:
					count_data(topic_hm,topic)
				for tag in video_tags:
					count_data(tag_hm,tag.lower())
				if pos > 0 and pos < len(date_sorter):
					next_date = date_sorter[pos-1]
					if video_duration > next_date - date:
						video_duration = next_date - date
					else:
						video_duration *= watch_percentage
				count_data(duration_hm,video_duration)
				count_data(channel_hm,channel_title)
			except (KeyError,IndexError):
				print("missing information for video '" + x['contentDetails']['videoId'] + "'")
				clear_videos_vars()
			except:
				print("unknown error for video '" + x['contentDetails']['videoId'] + "'")
				clear_videos_vars()
		else:
			print("unable to access video '" + x['contentDetails']['videoId'] + "'")
			clear_videos_vars()
	
		# save date data in dicts
		count_data(year_hm,str(date.year))
		count_data(month_hm,str(date.year) + "/" + str(format(date.month, '02')))
		count_data(day_hm,str(date.year) + "/" + str(format(date.month, '02')) + "/" + str(format(date.day, '02')))
		count_data(day_name_hm,date.strftime("%A"))
					
		# add all data to 'clean_history'
		videos.append({'pos':pos,'date':date.strftime('%d/%m/%y %H:%M'),'video':{'title':video_title,'description':video_description,'duration':str(video_duration),'thumbnails':video_thumbnails,'date':video_date,'location':video_location,'topics':video_topics,'tags':video_tags},'channel':{'title':channel_title,'id':channel_id}})
	print("account " + str(json.load(open(f))[0]['snippet']['channelTitle']) + " done")

# sort and save dicts in results files
for list_result, value in results.iteritems():
	if value:
		if list_result == 'clean_history':
			with open(os.path.join(results_dir,"clean_history.json"), 'w') as outfile:
				json.dump(videos, outfile,indent=4)

		elif list_result == 'average':
			for day in Day:
				try:
					day_name_hm[day]['nb'] = day_name_hm[day]['nb'] / (len(day_hm) / 7.0)
				except KeyError:
					day_name_hm[day] = {'nb':0}
			average_li.append({'videos per':{'day of the week':{Day[0]:day_name_hm[Day[0]]['nb'],Day[1]:day_name_hm[Day[1]]['nb'],Day[2]:day_name_hm[Day[2]]['nb'],Day[3]:day_name_hm[Day[3]]['nb'],Day[4]:day_name_hm[Day[4]]['nb'],Day[5]:day_name_hm[Day[5]]['nb'],Day[6]:day_name_hm[Day[6]]['nb']},'year':average(year_hm),'month':average(month_hm),'day':average(day_hm),'channel':average(channel_hm)}})

			nb = dur_sum = 0
			for key in duration_hm:
				dur_sum += duration_hm[key]['nb'] * key.total_seconds()
				nb += duration_hm[key]['nb']
			average_li.append({'video length (min)':dur_sum/nb/60})

			with open(os.path.join(results_dir,"average.json"), 'w') as outfile:
				json.dump(average_li, outfile,indent=4)

		elif list_result == 'channels':
			with open(os.path.join(results_dir,"channels.json"), 'w') as outfile:
				json.dump(sorted_list(channel_hm), outfile,indent=4)
		elif list_result == 'topics':
			with open(os.path.join(results_dir,"topics.json"), 'w') as outfile:
				json.dump(sorted_list(topic_hm), outfile,indent=4)
		elif list_result == 'tags':
			with open(os.path.join(results_dir,"tags.json"), 'w') as outfile:
				json.dump(sorted_list(tag_hm), outfile,indent=4)

		elif list_result == 'days':
			with open(os.path.join(results_dir,"days.json"), 'w') as outfile:
				json.dump(sorted_list(day_hm), outfile,indent=4)
		elif list_result == 'months':
			with open(os.path.join(results_dir,"months.json"), 'w') as outfile:
				json.dump(sorted_list(month_hm), outfile,indent=4)
		elif list_result == 'years':
			with open(os.path.join(results_dir,"years.json"), 'w') as outfile:
				json.dump(sorted_list(year_hm), outfile,indent=4)

My average.json, if you try to beat me :P
Output:
[ { "videos per": { "month": 200.36363636363637, "year": 1102.0, "day": 15.412587412587413, "channel": 2.287974683544304, "day of the week": { "Monday": 17.132867132867133, "Tuesday": 18.552447552447553, "Friday": 14.734265734265735, "Wednesday": 16.88811188811189, "Thursday": 9.447552447552448, "Sunday": 13.951048951048952, "Saturday": 17.181818181818183 } } }, { "video length (min)": 2.282149992315967 } ]
Reply
#2
New update!

Changelog
 
  • Add country with most viewed videos
  • Add most viewed youtube category
  • Support of video with new id
  • Optimization and more self-explanatory code
  • Handle more specific exceptions
  • 'video_duration' bug fixed

#!/usr/bin/python
#-*- coding: utf-8 -*-

import os
import re
import json
import operator
from datetime import datetime, timedelta
from apiclient import discovery
from apiclient.discovery import build

# Init
API_key = ";)"
# List of your watch-history files (allow several accounts)
files = ['/Users/mathieu/python/Historique Youtube/Aerosmite/watch-history.json','/Users/mathieu/python/Historique Youtube/Tetedecraft/watch-history.json','/Users/mathieu/python/Historique Youtube/mattraque2000/watch-history.json']
results_dir = '/Users/mathieu/python/Historique Youtube/Resultds'
# Load only videos after this date
min_date = "28/08/17" # DD/MM/YY
# List of allowed results files
results = {'average': True, 'clean_history': True, 'channels': True, 'topics': True, 'tags': True, 'country': True, 'category': True, 'days': True, 'months': True, 'years': True}
# Average percentage of total video watched
watch_percentage = 60 # %

service = build("youtube", "v3",developerKey=API_key)
Day = ['Monday','Tuesday','Friday','Wednesday','Thursday','Sunday','Saturday']
PT_format = re.compile(r'PT((?P<hours>\d+?)H)?((?P<minutes>\d+?)M)?((?P<seconds>\d+?)S)?')
min_date = datetime.strptime(min_date, '%d/%m/%y')
video_part = 'snippet,contentDetails,topicDetails,recordingDetails'
date_sorter =
videos =
average_li =

channel_hm = {}
topic_hm = {}
tag_hm = {}
country_hm = {}
category_hm = {}
categories_hm = {}
duration_hm = {}
day_name_hm = {}
day_hm = {}
month_hm = {}
year_hm = {}

# remove keyword arguments that are not set
def remove_empty_kwargs(**kwargs):
good_kwargs = {}
if kwargs is not None:
for key, value in kwargs.iteritems():
if value:
good_kwargs[key] = value
return good_kwargs

# sample python code for videos.list
def videos_list_by_id(service, **kwargs):
kwargs = remove_empty_kwargs(**kwargs)
return service.videos().list(**kwargs).execute()

# sample python code for channels.list
def channels_list_by_id(service, **kwargs):
kwargs = remove_empty_kwargs(**kwargs)
return service.channels().list(**kwargs).execute()

# sample python code for videoCategories.list
def video_categories_list(service, **kwargs):
kwargs = remove_empty_kwargs(**kwargs)
return service.videoCategories().list(**kwargs).execute()

# sample python code for search.list
def search_list_by_keyword(service, **kwargs):
kwargs = remove_empty_kwargs(**kwargs)
return service.search().list(**kwargs).execute()

def get_new_videoId(video_title):
results = search_list_by_keyword(service,maxResults=3,part='snippet',q=video_title,type='video')
for video_result in results['items']:
if video_result['snippet']['title'] == video_title:
return video_result['id']['videoId']
return None

def parse_time(time_str):
parts = PT_format.match(time_str)
if not parts:
return
parts = parts.groupdict()
time_params = {}
for (name, param) in parts.iteritems():
if param:
time_params[name] = int(param)
return timedelta(**time_params)

def clear_videos_vars():
video_title = video_description = video_duration = video_categoryId = video_categoryName = video_thumbnails = video_location = video_topics = video_tags = video_date = channel_id = channel_name = channel_country = None

def count_data(hm_name,key):
if key in hm_name.keys():
hm_name[key] += 1
else:
hm_name[key] = 1

def average(dict_name):
nb = 0
for key in dict_name:
nb += dict_name[key]
return float(nb)/len(dict_name)

def create_file(name,data):
with open(os.path.join(results_dir,name + ".json"), 'w') as outfile:
json.dump(data, outfile,indent=4)

# save watching date of videos
for f in files:
for x in json.load(open(f)):
date = datetime.strptime(x['snippet']['publishedAt'], '%Y-%m-%dT%H:%M:%S.000Z')
delta_date = date - min_date
if delta_date.total_seconds() > 0:
date_sorter.append(date)

if date_sorter == :
raise Exception('No videos detected')
print(str(len(date_sorter)) + " videos detected" )
# sort videos per watching date
date_sorter.sort(reverse=True)

for f in files:
for x in json.load(open(f)):
date = datetime.strptime(x['snippet']['publishedAt'], '%Y-%m-%dT%H:%M:%S.000Z')
delta_date = date - min_date
if delta_date.total_seconds() < 0:
continue

# define video position
pos = 0
for sorted_date in date_sorter:
if sorted_date == date:
break
else:
pos += 1

# access to video data
if x['snippet']['title'] != "Deleted video" and x['snippet']['title'] != "Private video":
try:
video_data = videos_list_by_id(service,part=video_part,id=x['contentDetails']['videoId'])
if len(video_data['items']) == 0:
new_videoId = get_new_videoId(x['snippet']['title'])
if new_videoId != None:
video_data = videos_list_by_id(service,part=video_part,id=new_videoId)
else:
raise IndexError

video_title = video_data['items'][0]['snippet']['title']
video_description = video_data['items'][0]['snippet']['description']
video_duration = parse_time(video_data['items'][0]['contentDetails']['duration'])
if pos > 0 and pos < len(date_sorter):
next_date = date_sorter[pos-1]
if video_duration > next_date - date:
video_duration = next_date - date
else:
video_duration = video_duration * watch_percentage / 100
video_thumbnails = video_data['items'][0]['snippet']['thumbnails']['default']['url']
video_categoryId = video_data['items'][0]['snippet']['categoryId']
video_date = datetime.strptime(video_data['items'][0]['snippet']['publishedAt'],'%Y-%m-%dT%H:%M:%S.000Z').strftime('%d/%m/%y %H:%M')

channel_id = video_data['items'][0]['snippet']['channelId']
channel_name = video_data['items'][0]['snippet']['channelTitle']


if video_data['items'][0].get('recordingDetails') != None:
video_location = video_data['items'][0]['recordingDetails'].get('location')
else:
video_location = None
if video_data['items'][0].get('topicDetails') != None:
video_topics = video_data['items'][0]['topicDetails'].get('topicCategories',)
else:
video_topics =
video_tags = video_data['items'][0]['snippet'].get('tags',)

# try to get country code
channel_data = channels_list_by_id(service,part='snippet',id=channel_id)
channel_country = channel_data['items'][0]['snippet'].get('country',video_data['items'][0]['snippet'].get('defaultLanguage'))

video_categoryName = None
if channel_country != None:
channel_country = channel_country.lower()
count_data(country_hm,channel_country)
# try to get category name
try:
if channel_country in categories_hm.keys():
categories_list = categories_hm[channel_country]
else:
if channel_country == 'en':
channel_country = 'us' # or "gb"
elif channel_country.split('-')[0] == 'en':
channel_country = channel_country.split('-')[1]
categories_list = video_categories_list(service,part='snippet',regionCode=channel_country)['items']
categories_hm[channel_country] = categories_list
for categoryId in categories_list:
if categoryId['id'] == video_categoryId:
video_categoryName = categoryId['snippet']['title']
count_data(category_hm,video_categoryName)
except:
pass

# save video data in dicts
count_data(duration_hm,video_duration)
count_data(channel_hm,channel_name)
for topic in video_topics:
count_data(topic_hm,topic)
for tag in video_tags:
count_data(tag_hm,tag.lower())

except (KeyError,IndexError):
print("missing information for video '" + x['snippet']['title'] + "'")
clear_videos_vars()
except Exception as e:
print(str(e) + " for video '" + x['snippet']['title'] + "'")
clear_videos_vars()
else:
print("unable to access a " + x['snippet']['title'])
clear_videos_vars()

# save date in dicts
count_data(year_hm,str(date.year))
count_data(month_hm,str(date.year) + "/" + str(format(date.month, '02')))
count_data(day_hm,str(date.year) + "/" + str(format(date.month, '02')) + "/" + str(format(date.day, '02')))
count_data(day_name_hm,date.strftime("%A"))

# add all data to 'clean_history'
videos.append({'pos':pos,'date':date.strftime('%d/%m/%y %H:%M'),'video':{'title':video_title,'description':video_description,'duration':str(video_duration),'category':video_categoryName,'thumbnails':video_thumbnails,'date':video_date,'location':video_location,'topics':video_topics,'tags':video_tags,'channel':{'title':channel_name,'id':channel_id,'country':channel_country}}})
print("account " + str(json.load(open(f))[0]['snippet']['channelTitle']) + " done")

# create new path if necessary
if os.path.isdir(results_dir) == False:
os.makedirs(results_dir)

# sort and save dicts in results files
for list_result, value in results.iteritems():
if value:
if list_result == 'average':
if day_hm != {}:
for day in Day:
if day in day_name_hm.keys():
day_name_hm[day] = day_name_hm[day] / (len(day_hm) / 7.0)
else:
day_name_hm[day] = 0
average_li.append({'videos per':{'day of the week':{Day[0]:day_name_hm[Day[0]],Day[1]:day_name_hm[Day[1]],Day[2]:day_name_hm[Day[2]],Day[3]:day_name_hm[Day[3]],Day[4]:day_name_hm[Day[4]],Day[5]:day_name_hm[Day[5]],Day[6]:day_name_hm[Day[6]]},'year':average(year_hm),'month':average(month_hm),'day':average(day_hm),'channel':average(channel_hm)}})
else:
average_li.append({'videos per':None})

if duration_hm != {}:
nb = dur_sum = 0
for key in duration_hm:
dur_sum += duration_hm[key] * key.total_seconds()
nb += duration_hm[key]
average_li.append({'video length (min)':dur_sum/nb/60})
else:
average_li.append({'video length (min)':None})
create_file(list_result,average_li)

elif list_result == 'clean_history':
create_file(list_result,videos)
elif list_result == 'channels':
create_file(list_result,sorted(channel_hm.items(), key=operator.itemgetter(1),reverse=True))
elif list_result == 'topics':
create_file(list_result,sorted(topic_hm.items(), key=operator.itemgetter(1),reverse=True))
elif list_result == 'tags':
create_file(list_result,sorted(tag_hm.items(), key=operator.itemgetter(1),reverse=True))
elif list_result == 'country':
create_file(list_result,sorted(country_hm.items(), key=operator.itemgetter(1),reverse=True))
elif list_result == 'category':
create_file(list_result,sorted(category_hm.items(), key=operator.itemgetter(1),reverse=True))
elif list_result == 'days':
create_file(list_result,sorted(day_hm.items(), key=operator.itemgetter(1),reverse=True))
elif list_result == 'months':
create_file(list_result,sorted(month_hm.items(), key=operator.itemgetter(1),reverse=True))
elif list_result == 'years':
create_file(list_result,year_hm(topic_hm.items(), key=operator.itemgetter(1),reverse=True))
Reply
#3
Hello,

this is very usefull ! I can wait to be able to analyze my history. Thank you :)

ligne 29
date_sorter =
videos =
average_li =
should it be ?

date_sorter = {}
videos = {}
average_li = {}
Also would you have a text version of this code because I have an intending error and I think there might be a problem when copy paste.


Thank you !
Reply
#4
@[Redoudou]
Its an issue with our editor. its suppose to be empty lists
second bullet here
https://python-forum.io/misc.php?action=help&hid=39
Recommended Tutorials:
Reply
#5
Thank you @[metulburr],

I've had done the correction.

However I keep facing and intentation error in the follow function.

# remove keyword arguments that are not set
def remove_empty_kwargs(**kwargs):
good_kwargs = {}
if kwargs is not None:
for key, value in kwargs.iteritems():
if value:
good_kwargs[key] = value
return good_kwargs
Error:
line 47 good_kwargs = {} ^ IndentationError: expected an indented block
For what I know from my research the syntax is good. It edited the file in nqq and check good_kwargs(singlespace)=(singlespace){}
but the error stays.

@[Aerosmite] Any idea where I should oriented my research ?

Thank you :)
Reply


Possibly Related Threads…
Thread Author Replies Views Last Post
Photo Image Color Analyzer Aerosmite 4 4,961 Aug-29-2017, 08:40 AM
Last Post: Aerosmite
  Alt history randomized jamesfrancis 2 4,265 Oct-09-2016, 02:47 PM
Last Post: jamesfrancis

Forum Jump:

User Panel Messages

Announcements
Announcement #1 8/1/2020
Announcement #2 8/2/2020
Announcement #3 8/6/2020