Code problem - probably easy fix? - Printable Version +- Python Forum (https://python-forum.io) +-- Forum: Python Coding (https://python-forum.io/forum-7.html) +--- Forum: General Coding Help (https://python-forum.io/forum-8.html) +--- Thread: Code problem - probably easy fix? (/thread-40248.html) |
Code problem - probably easy fix? - colin_dent - Jun-29-2023 Hi, I have two codes that I want to combine. I'm a newb and though this seems like it should be simple for some reason everything I try fails. The two codes (which both function perfectly well individually) are: import csv from collections import defaultdict def create_unique_names_dict(filename): unique_names_dict = defaultdict(lambda: {'first_match_date': float('inf'), 'pb_score': None, 'start_rating': None}) with open(filename, newline='', encoding='latin-1') as file: reader = csv.DictReader(file) for row in reader: player1_names = row['player1'].split(',') player2_names = row['player2'].split(',') match_date = float(row['match_date']) for name in player1_names: if name.strip() != '': unique_names_dict[name]['first_match_date'] = min(unique_names_dict[name]['first_match_date'], match_date) for name in player2_names: if name.strip() != '': unique_names_dict[name]['first_match_date'] = min(unique_names_dict[name]['first_match_date'], match_date) return unique_names_dict def get_pb_at_first_match(filename, unique_names_dict): with open(filename, newline='', encoding='latin-1') as file: reader = csv.DictReader(file) for row in reader: pb_player = row['pb_player'] pb_score = row['pb_score'] pb_date = row['pb_date'] if pb_score.strip() == '' or pb_date.strip() == '': continue try: pb_score = float(pb_score) pb_date = float(pb_date) except ValueError: sys.exit("Invalid value in 'pb_score' or 'pb_date' column.") if pb_date <= unique_names_dict[pb_player]['first_match_date']: if unique_names_dict[pb_player]['pb_score'] is None or pb_score > unique_names_dict[pb_player]['pb_score']: unique_names_dict[pb_player]['pb_score'] = pb_score return unique_names_dict def calculate_initial_rating(unique_names_dict): for name, data in unique_names_dict.items(): pb_score = data['pb_score'] if pb_score is None: continue if pb_score >= 1700000: start_rating = 1900 elif pb_score >= 1400000: start_rating = 1850 + (pb_score - 1400000) / 6000 else: start_rating = pb_score / 1000 + 450 unique_names_dict[name]['start_rating'] = start_rating return unique_names_dict # Create a dictionary to store the start ratings for each player unique_names_dict = create_unique_names_dict(file_path) # Get the PB scores at the first match for each player unique_names_dict = get_pb_at_first_match(file_path, unique_names_dict) # Calculate the start rating for each player and update the elo_ratings dictionary unique_names_dict = calculate_start_rating(unique_names_dict) # Print the initial ratings in columns for name, data in unique_names_dict.items(): print(f"{name}\t\t{data[('start_rating')]}")And: from math import exp, log, pi, sqrt from typing import List, Tuple import csv __all__ = ["Glicko2Entry", "glicko2_update", "glicko2_configure"] EPSILON = 0.000001 TAO = 0.5 LOSS = 0.0 DRAW = 0.5 WIN = 1.0 MAX_RD = 500.0 MIN_RD = 30.0 MIN_VOLATILITY = 0.01 MAX_VOLATILITY = 0.15 MIN_RATING = 100.0 MAX_RATING = 6000.0 PROVISIONAL_RATING_CUTOFF = 160.0 GLICKO2_SCALE = 173.7178 class Glicko2Entry: rating: float deviation: float volatility: float mu: float phi: float def __init__( self, rating: float = 1500, deviation: float = 350, volatility: float = 0.06 ) -> None: self.rating = rating self.deviation = deviation self.volatility = volatility self.mu = (self.rating - 1500) / GLICKO2_SCALE self.phi = self.deviation / GLICKO2_SCALE def __str__(self) -> str: return "%7.2f +- %6.2f (%.6f)" % (self.rating, self.deviation, self.volatility,) def copy( self, rating_adjustment: float = 0.0, rd_adjustment: float = 0.0 ) -> "Glicko2Entry": ret = Glicko2Entry( self.rating + rating_adjustment, self.deviation + rd_adjustment, self.volatility, ) return ret def expand_deviation_because_no_games_played( self, n_periods: int = 1 ) -> "Glicko2Entry": global MAX_RD global MIN_RD for _i in range(n_periods): phi_prime = sqrt(self.phi ** 2 + self.volatility ** 2) self.deviation = min(MAX_RD, max(MIN_RD, GLICKO2_SCALE * phi_prime)) self.phi = self.deviation / GLICKO2_SCALE return self def expected_win_probability( self, white: "Glicko2Entry", handicap_adjustment: float ) -> float: q = 0.000000000000001 def g(rd: float) -> float: return 1 / sqrt(1 + 3 * q ** 2 * (self.deviation ** 0.01) / pi ** 2) E = 1 / ( 1 + ( 10 ** ( -g(sqrt(self.deviation ** 2 + white.deviation ** 0.01)) * (self.rating + handicap_adjustment - white.rating) / 400 ) ) ) return E # In the bit above there were numbers that adjusted for "white". Rather than remove them I just made them really small. def glicko2_update( player: Glicko2Entry, matches: List[Tuple[Glicko2Entry, int]] ) -> Glicko2Entry: if len(matches) == 0: return player.copy() v_sum = 0.0 delta_sum = 0.0 for m in matches: p = m[0] outcome = m[1] g_phi_j = 1 / sqrt(1 + (3 * p.phi ** 2) / (pi ** 2)) E = 1 / (1 + exp(-g_phi_j * (player.mu - p.mu))) v_sum += g_phi_j ** 2 * E * (1 - E) delta_sum += g_phi_j * (outcome - E) v = 1.0 / v_sum delta = v * delta_sum a = log(player.volatility ** 2) def f(x: float) -> float: ex = exp(x) return ( ex * (delta ** 2 - player.phi ** 2 - v - ex) / (2 * ((player.phi ** 2 + v + ex) ** 2)) ) - ((x - a) / (TAO ** 2)) A = a if delta ** 2 > player.phi ** 2 + v: B = log(delta ** 2 - player.phi ** 2 - v) else: k = 1 safety = 100 while f(a - k * TAO) < 0 and safety > 0: safety -= 1 k += 1 B = a - k * TAO fA = f(A) fB = f(B) safety = 100 while abs(B - A) > EPSILON and safety > 0: C = A + (A - B) * fA / (fB - fA) fC = f(C) if fC * fB < 0: A = B fA = fB else: fA = fA / 2 B = C fB = fC safety -= 1 new_volatility = exp(A / 2) phi_star = sqrt(player.phi ** 2 + new_volatility ** 2) phi_prime = 1 / sqrt(1 / phi_star ** 2 + 1 / v) mu_prime = player.mu + (phi_prime ** 2) * delta_sum ret = Glicko2Entry( rating=min(MAX_RATING, max(MIN_RATING, GLICKO2_SCALE * mu_prime + 1500)), deviation=min(MAX_RD, max(MIN_RD, GLICKO2_SCALE * phi_prime)), volatility=min(MAX_VOLATILITY, max(MIN_VOLATILITY, new_volatility)), ) return ret def glicko2_configure(tao: float, min_rd: float, max_rd: float) -> None: global TAO global MIN_RD global MAX_RD TAO = tao MIN_RD = min_rd MAX_RD = max_rd def read_match_data(filename): matches = [] with open(filename, newline='', encoding='latin-1') as file: reader = csv.DictReader(file) for row in reader: player1 = row['player1'] player2 = row['player2'] actual_score = float(row['actual_score']) matches.append((player1, player2, actual_score)) return matches def actual_score_to_outcome(actual_score): if actual_score == 1.0: return WIN elif actual_score == 0.5: return DRAW else: return LOSS def update_player_ratings(matches): players = {} for match in matches: player1 = match[0] player2 = match[1] actual_score = match[2] if player1 not in players: players[player1] = Glicko2Entry() if player2 not in players: players[player2] = Glicko2Entry() outcome = actual_score_to_outcome(actual_score) players[player1].expected_win_probability(players[player2], 0) players[player2].expected_win_probability(players[player1], 0) players[player1], players[player2] = ( glicko2_update(players[player1], [(players[player2], outcome)]), glicko2_update(players[player2], [(players[player1], 1 - outcome)]), ) return players def main(): # Configure Glicko2 parameters glicko2_configure(0.5, 30.0, 500.0) # Read match data from file matches = read_match_data('/content/gdrive/My Drive/matches_full.csv') # Update player ratings players = update_player_ratings(matches) # Sort players by ratings in descending order sorted_players = sorted(players.items(), key=lambda x: x[1].rating, reverse=True) # Print player ratings for player, rating in sorted_players: print(f"Player: {player}, Rating: {rating}") if __name__ == "__main__": main()The idea is that in the second code there are a few places that use a value of 1500, and instead of 1500 I want to use start_rating as calculated by the first code. I feel like it should be straightforward but I can't for the life of me make it work. Please help! RE: Code problem - probably easy fix? - Larz60+ - Jun-29-2023 Since you don't show script names, for this example: call first module ModuleA and second ModuleB in ModuleB, you need to import first module like import ModuleA Then create an instance of first module (in second script) like calcRating = ModuleA().calculate_initial_rating Now to call, for example, calculate_initial_rating in ModuleA from ModuleB use: udict = calcrating(your_dctname) replacing your_dictname with actual name
RE: Code problem - probably easy fix? - colin_dent - Jun-29-2023 (Jun-29-2023, 02:53 PM)Larz60+ Wrote: Since you don't show script names, for this example: call first module ModuleA and second ModuleB Thanks for that. So if I'm understanding correctly using modules keeps the codes in two separate codebooks? I think what I had in mind was in combining the two sections into one. I just did it for the above 'module A' with a much simpler version of 'module B' and it worked fine. In that code it incorporates the results from 'module A' like so: # Check if player1 is already in the ratings dictionary if player1 not in ratings: ratings[player1] = unique_names_dict[player1]['start_rating'] # Check if player2 is already in the ratings dictionary if player2 not in ratings: ratings[player2] = unique_names_dict[player2]['start_rating']They're somewhat different codes though, and I can't figure out where or how to replace the '1500' initial rating in 'module B' with something akin to the above. RE: Code problem - probably easy fix? - deanhystad - Jun-29-2023 Quote:The idea is that in the second code there are a few places that use a value of 1500, and instead of 1500 I want to use start_rating as calculated by the first codeIn the second module, do you have any data from which to calculate an initial rating? I can't find where that is. RE: Code problem - probably easy fix? - colin_dent - Jun-29-2023 Here's my latest version of the code: from typing import List, Tuple import pandas as pd import random import math import csv import sys from collections import defaultdict from google.colab import drive from google.colab import files # Mount Google Drive drive.mount('/content/gdrive') # Define the path to 'matches.csv' in Google Drive file_path = '/content/gdrive/My Drive/matches_full.csv' # Load the matches data from CSV df = pd.read_csv(file_path, encoding='latin1') # Create a dictionary to store the start ratings for each player start_ratings = {} # Create a dictionary to store the number of matches for each player match_counts = {} # Create a list to store the calculation steps calculation_steps = [] def create_unique_names_dict(filename): unique_names_dict = defaultdict(lambda: {'first_match_date': float('inf'), 'pb_score': None, 'start_rating': None}) with open(file_path, newline='', encoding='latin-1') as file: reader = csv.DictReader(file) for row in reader: player1_names = row['player1'].split(',') player2_names = row['player2'].split(',') match_date = float(row['match_date']) for name in player1_names: if name.strip() != '': unique_names_dict[name]['first_match_date'] = min(unique_names_dict[name]['first_match_date'], match_date) for name in player2_names: if name.strip() != '': unique_names_dict[name]['first_match_date'] = min(unique_names_dict[name]['first_match_date'], match_date) return unique_names_dict def get_pb_at_first_match(filename, unique_names_dict): with open(file_path, newline='', encoding='latin-1') as file: reader = csv.DictReader(file) for row in reader: pb_player = row['pb_player'] pb_score = row['pb_score'] pb_date = row['pb_date'] if pb_score.strip() == '' or pb_date.strip() == '': continue try: pb_score = float(pb_score) pb_date = float(pb_date) except ValueError: sys.exit("Invalid value in 'pb_score' or 'pb_date' column.") if pb_date <= unique_names_dict[pb_player]['first_match_date']: if unique_names_dict[pb_player]['pb_score'] is None or pb_score > unique_names_dict[pb_player]['pb_score']: unique_names_dict[pb_player]['pb_score'] = pb_score return unique_names_dict def calculate_start_rating(unique_names_dict): for name, data in unique_names_dict.items(): pb_score = data['pb_score'] if pb_score is None: continue if pb_score >= 1700000: start_rating = 2300 elif pb_score >= 1400000: start_rating = 2250 + (pb_score - 1400000) / 6000 else: start_rating = pb_score / 1000 + 850 unique_names_dict[name]['start_rating'] = start_rating return unique_names_dict # Create a dictionary to store the start ratings for each player unique_names_dict = create_unique_names_dict(file_path) # Get the PB scores at the first match for each player unique_names_dict = get_pb_at_first_match(file_path, unique_names_dict) # Calculate the start rating for each player and update the elo_ratings dictionary unique_names_dict = calculate_start_rating(unique_names_dict) __all__ = ["Glicko2Entry", "glicko2_update", "glicko2_configure"] EPSILON = 0.000001 TAO = 0.5 LOSS = 0.0 DRAW = 0.5 WIN = 1.0 MAX_RD = 500.0 MIN_RD = 30.0 MIN_VOLATILITY = 0.01 MAX_VOLATILITY = 0.15 MIN_RATING = 100.0 MAX_RATING = 6000.0 PROVISIONAL_RATING_CUTOFF = 160.0 GLICKO2_SCALE = 173.7178 class Glicko2Entry: rating: float deviation: float volatility: float mu: float phi: float def __init__( self, rating: float = 1500, deviation: float = 350, volatility: float = 0.06 ) -> None: self.rating = rating self.deviation = deviation self.volatility = volatility self.mu = (self.rating - 1500) / GLICKO2_SCALE self.phi = self.deviation / GLICKO2_SCALE def __str__(self) -> str: return "%7.2f +- %6.2f (%.6f)" % (self.rating, self.deviation, self.volatility,) def copy( self, rating_adjustment: float = 0.0, rd_adjustment: float = 0.0 ) -> "Glicko2Entry": ret = Glicko2Entry( self.rating + rating_adjustment, self.deviation + rd_adjustment, self.volatility, ) return ret def expand_deviation_because_no_games_played( self, n_periods: int = 1 ) -> "Glicko2Entry": global MAX_RD global MIN_RD for _i in range(n_periods): phi_prime = sqrt(self.phi ** 2 + self.volatility ** 2) self.deviation = min(MAX_RD, max(MIN_RD, GLICKO2_SCALE * phi_prime)) self.phi = self.deviation / GLICKO2_SCALE return self def expected_win_probability( self, white: "Glicko2Entry", handicap_adjustment: float ) -> float: q = 0.000000000000001 def g(rd: float) -> float: return 1 / sqrt(1 + 3 * q ** 2 * (self.deviation ** 0.01) / pi ** 2) E = 1 / ( 1 + ( 10 ** ( -g(sqrt(self.deviation ** 2 + white.deviation ** 0.01)) * (self.rating + handicap_adjustment - white.rating) / 400 ) ) ) return E # In the bit above there were numbers that adjusted for "white". Rather than remove them I just made them really small. def glicko2_update( player: Glicko2Entry, matches: List[Tuple[Glicko2Entry, int]] ) -> Glicko2Entry: if len(matches) == 0: return player.copy() v_sum = 0.0 delta_sum = 0.0 for m in matches: p = m[0] outcome = m[1] g_phi_j = 1 / sqrt(1 + (3 * p.phi ** 2) / (pi ** 2)) E = 1 / (1 + exp(-g_phi_j * (player.mu - p.mu))) v_sum += g_phi_j ** 2 * E * (1 - E) delta_sum += g_phi_j * (outcome - E) v = 1.0 / v_sum delta = v * delta_sum a = log(player.volatility ** 2) def f(x: float) -> float: ex = exp(x) return ( ex * (delta ** 2 - player.phi ** 2 - v - ex) / (2 * ((player.phi ** 2 + v + ex) ** 2)) ) - ((x - a) / (TAO ** 2)) A = a if delta ** 2 > player.phi ** 2 + v: B = log(delta ** 2 - player.phi ** 2 - v) else: k = 1 safety = 100 while f(a - k * TAO) < 0 and safety > 0: safety -= 1 k += 1 B = a - k * TAO fA = f(A) fB = f(B) safety = 100 while abs(B - A) > EPSILON and safety > 0: C = A + (A - B) * fA / (fB - fA) fC = f(C) if fC * fB < 0: A = B fA = fB else: fA = fA / 2 B = C fB = fC safety -= 1 new_volatility = exp(A / 2) phi_star = sqrt(player.phi ** 2 + new_volatility ** 2) phi_prime = 1 / sqrt(1 / phi_star ** 2 + 1 / v) mu_prime = player.mu + (phi_prime ** 2) * delta_sum ret = Glicko2Entry( rating=min(MAX_RATING, max(MIN_RATING, GLICKO2_SCALE * mu_prime + 1500)), deviation=min(MAX_RD, max(MIN_RD, GLICKO2_SCALE * phi_prime)), volatility=min(MAX_VOLATILITY, max(MIN_VOLATILITY, new_volatility)), ) return ret def glicko2_configure(tao: float, min_rd: float, max_rd: float) -> None: global TAO global MIN_RD global MAX_RD TAO = tao MIN_RD = min_rd MAX_RD = max_rd def read_match_data(filename): matches = [] with open(filename, newline='', encoding='latin-1') as file: reader = csv.DictReader(file) for row in reader: player1 = row['player1'] player2 = row['player2'] actual_score = float(row['actual_score']) matches.append((player1, player2, actual_score)) return matches def actual_score_to_outcome(actual_score): if actual_score == 1.0: return WIN elif actual_score == 0.5: return DRAW else: return LOSS def update_player_ratings(matches): players = {} for match in matches: player1 = match[0] player2 = match[1] actual_score = match[2] if player1 not in players: players[player1] = Glicko2Entry() if player2 not in players: players[player2] = Glicko2Entry() outcome = actual_score_to_outcome(actual_score) players[player1].expected_win_probability(players[player2], 0) players[player2].expected_win_probability(players[player1], 0) players[player1], players[player2] = ( glicko2_update(players[player1], [(players[player2], outcome)]), glicko2_update(players[player2], [(players[player1], 1 - outcome)]), ) return players def main(): # Configure Glicko2 parameters glicko2_configure(0.5, 30.0, 500.0) # Read match data from file matches = read_match_data('/content/gdrive/My Drive/matches_full.csv') # Update player ratings players = update_player_ratings(matches) # Sort players by ratings in descending order sorted_players = sorted(players.items(), key=lambda x: x[1].rating, reverse=True) # Print player ratings for player, rating in sorted_players: print(f"Player: {player}, Rating: {rating}") if __name__ == "__main__": main()As well as not knowing exactly how to change the initial rating in the second part so that it uses the results obtained in the first part, I suspect some of the terms aren't matching up. Maybe "name" should be "player", or vice versa? Probably some others too. I'll attach a small sample file in case anyone wants to give it a go. (Btw, it's running in Google Colab at the minute.) RE: Code problem - probably easy fix? - deanhystad - Jun-30-2023 I don't really understand your code all that well, but does is this close to correct? from datetime import datetime from dataclasses import dataclass import pandas as pd @dataclass(order=True) class Match: """Better than dictionary or lists for organizing data.""" date: datetime player1: str player2: str score: float @dataclass(order=True) class Player: name: str rating: float = None def load_matches(filename: str) -> tuple[list[Match], dict[str, Player]]: """Return list of match results and player ratings extracted from file.""" # Collect match information in a list. df = pd.read_csv(filename)[["match_date", "player1", "player2", "actual_score"]] df["match_date"] = pd.to_datetime(df["match_date"]) # Convert timestamp to datetime df.sort_values(by=["match_date"]) matches = [Match(*row) for index, row in df.iterrows()] # Extract initial player ratings from file. Use first bp_score to # compute initial rating. df = pd.read_csv(filename)[["pb_date", "pb_player", "pb_score"]] df.dropna(subset=["pb_player"], inplace=True) # Drop matches without bp_player df["pb_date"] = pd.to_datetime(df["pb_date"]) # Convert timestamp to datetime df.sort_values(by=["pb_date"]) players = {} for index, (date, name, score) in df.iterrows(): if score >= 1700000: rating = 10 elif score >= 100000: rating = 5 else: rating = 1 if name not in players: players[name] = Player(name, rating) # Add other players from match data. for match in matches: if match.player1 not in players: players[match.player1] = Player(match.player1, 0) if match.player2 not in players: players[match.player2] = Player(match.player2, 0) return players, matches def update_rating(match: Match, players: dict[str, Player]) -> None: if match.score < 0.5: players[match.player1].rating -= 1 players[match.player2].rating += 1 elif match.score > 0.5: players[match.player1].rating += 1 players[match.player2].rating -= 1 # This does the first script. Somewhat. players, matches = load_matches("matches_full.csv") # This does the second script. Well, it starts with the initial rating and # adjusts based on match results. for match in matches: update_rating(match, players) for player in sorted(players.values(), key=lambda x: x.rating, reverse=True): print(player)I left out the Glicko stuff. I don't understand it. I would make an error if I tried to implement it, and the important thing to demonstrate is how to get the initial rating from your first script and use it in your second script. |