Python Forum
Code problem - probably easy fix?
Thread Rating:
  • 0 Vote(s) - 0 Average
  • 1
  • 2
  • 3
  • 4
  • 5
Code problem - probably easy fix?
#1
Hi, I have two codes that I want to combine. I'm a newb and though this seems like it should be simple for some reason everything I try fails. The two codes (which both function perfectly well individually) are:

import csv
from collections import defaultdict

def create_unique_names_dict(filename):
    unique_names_dict = defaultdict(lambda: {'first_match_date': float('inf'), 'pb_score': None, 'start_rating': None})

    with open(filename, newline='', encoding='latin-1') as file:
        reader = csv.DictReader(file)
        for row in reader:
            player1_names = row['player1'].split(',')
            player2_names = row['player2'].split(',')
            match_date = float(row['match_date'])

            for name in player1_names:
                if name.strip() != '':
                    unique_names_dict[name]['first_match_date'] = min(unique_names_dict[name]['first_match_date'], match_date)

            for name in player2_names:
                if name.strip() != '':
                    unique_names_dict[name]['first_match_date'] = min(unique_names_dict[name]['first_match_date'], match_date)

    return unique_names_dict

def get_pb_at_first_match(filename, unique_names_dict):
    with open(filename, newline='', encoding='latin-1') as file:
        reader = csv.DictReader(file)
        for row in reader:
            pb_player = row['pb_player']
            pb_score = row['pb_score']
            pb_date = row['pb_date']

            if pb_score.strip() == '' or pb_date.strip() == '':
                continue

            try:
                pb_score = float(pb_score)
                pb_date = float(pb_date)
            except ValueError:
                sys.exit("Invalid value in 'pb_score' or 'pb_date' column.")

            if pb_date <= unique_names_dict[pb_player]['first_match_date']:
                if unique_names_dict[pb_player]['pb_score'] is None or pb_score > unique_names_dict[pb_player]['pb_score']:
                    unique_names_dict[pb_player]['pb_score'] = pb_score

    return unique_names_dict

def calculate_initial_rating(unique_names_dict):
    for name, data in unique_names_dict.items():
        pb_score = data['pb_score']

        if pb_score is None:
            continue

        if pb_score >= 1700000:
            start_rating = 1900
        elif pb_score >= 1400000:
            start_rating = 1850 + (pb_score - 1400000) / 6000
        else:
            start_rating = pb_score / 1000 + 450

        unique_names_dict[name]['start_rating'] = start_rating

    return unique_names_dict

# Create a dictionary to store the start ratings for each player
unique_names_dict = create_unique_names_dict(file_path)

# Get the PB scores at the first match for each player
unique_names_dict = get_pb_at_first_match(file_path, unique_names_dict)

# Calculate the start rating for each player and update the elo_ratings dictionary
unique_names_dict = calculate_start_rating(unique_names_dict)

# Print the initial ratings in columns
for name, data in unique_names_dict.items():
    print(f"{name}\t\t{data[('start_rating')]}")
And:

from math import exp, log, pi, sqrt
from typing import List, Tuple
import csv

__all__ = ["Glicko2Entry", "glicko2_update", "glicko2_configure"]

EPSILON = 0.000001
TAO = 0.5
LOSS = 0.0
DRAW = 0.5
WIN = 1.0
MAX_RD = 500.0
MIN_RD = 30.0
MIN_VOLATILITY = 0.01
MAX_VOLATILITY = 0.15
MIN_RATING = 100.0
MAX_RATING = 6000.0
PROVISIONAL_RATING_CUTOFF = 160.0
GLICKO2_SCALE = 173.7178

class Glicko2Entry:
    rating: float
    deviation: float
    volatility: float
    mu: float
    phi: float

    def __init__(
        self, rating: float = 1500, deviation: float = 350, volatility: float = 0.06
    ) -> None:
        self.rating = rating
        self.deviation = deviation
        self.volatility = volatility
        self.mu = (self.rating - 1500) / GLICKO2_SCALE
        self.phi = self.deviation / GLICKO2_SCALE

    def __str__(self) -> str:
        return "%7.2f +- %6.2f (%.6f)" % (self.rating, self.deviation, self.volatility,)

    def copy(
        self, rating_adjustment: float = 0.0, rd_adjustment: float = 0.0
    ) -> "Glicko2Entry":
        ret = Glicko2Entry(
            self.rating + rating_adjustment,
            self.deviation + rd_adjustment,
            self.volatility,
        )
        return ret

    def expand_deviation_because_no_games_played(
        self, n_periods: int = 1
    ) -> "Glicko2Entry":
        global MAX_RD
        global MIN_RD

        for _i in range(n_periods):
            phi_prime = sqrt(self.phi ** 2 + self.volatility ** 2)
            self.deviation = min(MAX_RD, max(MIN_RD, GLICKO2_SCALE * phi_prime))
            self.phi = self.deviation / GLICKO2_SCALE

        return self

    def expected_win_probability(
        self, white: "Glicko2Entry", handicap_adjustment: float
    ) -> float:
        q = 0.000000000000001

        def g(rd: float) -> float:
            return 1 / sqrt(1 + 3 * q ** 2 * (self.deviation ** 0.01) / pi ** 2)

        E = 1 / (
            1
            + (
                10
                ** (
                    -g(sqrt(self.deviation ** 2 + white.deviation ** 0.01))
                    * (self.rating + handicap_adjustment - white.rating)
                    / 400
                )
            )
        )
        return E

# In the bit above there were numbers that adjusted for "white". Rather than remove them I just made them really small.

def glicko2_update(
    player: Glicko2Entry, matches: List[Tuple[Glicko2Entry, int]]
) -> Glicko2Entry:
    if len(matches) == 0:
        return player.copy()

    v_sum = 0.0
    delta_sum = 0.0
    for m in matches:
        p = m[0]
        outcome = m[1]
        g_phi_j = 1 / sqrt(1 + (3 * p.phi ** 2) / (pi ** 2))
        E = 1 / (1 + exp(-g_phi_j * (player.mu - p.mu)))
        v_sum += g_phi_j ** 2 * E * (1 - E)
        delta_sum += g_phi_j * (outcome - E)

    v = 1.0 / v_sum
    delta = v * delta_sum

    a = log(player.volatility ** 2)

    def f(x: float) -> float:
        ex = exp(x)
        return (
            ex
            * (delta ** 2 - player.phi ** 2 - v - ex)
            / (2 * ((player.phi ** 2 + v + ex) ** 2))
        ) - ((x - a) / (TAO ** 2))

    A = a
    if delta ** 2 > player.phi ** 2 + v:
        B = log(delta ** 2 - player.phi ** 2 - v)
    else:
        k = 1
        safety = 100
        while f(a - k * TAO) < 0 and safety > 0:
            safety -= 1
            k += 1
        B = a - k * TAO

    fA = f(A)
    fB = f(B)
    safety = 100

    while abs(B - A) > EPSILON and safety > 0:
        C = A + (A - B) * fA / (fB - fA)
        fC = f(C)
        if fC * fB < 0:
            A = B
            fA = fB
        else:
            fA = fA / 2
        B = C
        fB = fC

        safety -= 1

    new_volatility = exp(A / 2)

    phi_star = sqrt(player.phi ** 2 + new_volatility ** 2)

    phi_prime = 1 / sqrt(1 / phi_star ** 2 + 1 / v)
    mu_prime = player.mu + (phi_prime ** 2) * delta_sum

    ret = Glicko2Entry(
        rating=min(MAX_RATING, max(MIN_RATING, GLICKO2_SCALE * mu_prime + 1500)),
        deviation=min(MAX_RD, max(MIN_RD, GLICKO2_SCALE * phi_prime)),
        volatility=min(MAX_VOLATILITY, max(MIN_VOLATILITY, new_volatility)),
    )
    return ret

def glicko2_configure(tao: float, min_rd: float, max_rd: float) -> None:
    global TAO
    global MIN_RD
    global MAX_RD

    TAO = tao
    MIN_RD = min_rd
    MAX_RD = max_rd

def read_match_data(filename):
    matches = []
    with open(filename, newline='', encoding='latin-1') as file:
        reader = csv.DictReader(file)
        for row in reader:
            player1 = row['player1']
            player2 = row['player2']
            actual_score = float(row['actual_score'])
            matches.append((player1, player2, actual_score))
    return matches

def actual_score_to_outcome(actual_score):
    if actual_score == 1.0:
        return WIN
    elif actual_score == 0.5:
        return DRAW
    else:
        return LOSS

def update_player_ratings(matches):
    players = {}

    for match in matches:
        player1 = match[0]
        player2 = match[1]
        actual_score = match[2]

        if player1 not in players:
            players[player1] = Glicko2Entry()
        if player2 not in players:
            players[player2] = Glicko2Entry()

        outcome = actual_score_to_outcome(actual_score)
        players[player1].expected_win_probability(players[player2], 0)
        players[player2].expected_win_probability(players[player1], 0)

        players[player1], players[player2] = (
            glicko2_update(players[player1], [(players[player2], outcome)]),
            glicko2_update(players[player2], [(players[player1], 1 - outcome)]),
        )

    return players

def main():
    # Configure Glicko2 parameters
    glicko2_configure(0.5, 30.0, 500.0)

    # Read match data from file
    matches = read_match_data('/content/gdrive/My Drive/matches_full.csv')

    # Update player ratings
    players = update_player_ratings(matches)

    # Sort players by ratings in descending order
    sorted_players = sorted(players.items(), key=lambda x: x[1].rating, reverse=True)

    # Print player ratings
    for player, rating in sorted_players:
        print(f"Player: {player}, Rating: {rating}")

if __name__ == "__main__":
    main()
The idea is that in the second code there are a few places that use a value of 1500, and instead of 1500 I want to use start_rating as calculated by the first code.

I feel like it should be straightforward but I can't for the life of me make it work. Please help!
Reply
#2
Since you don't show script names, for this example: call first module ModuleA and second ModuleB

in ModuleB, you need to import first module
like import ModuleA

Then create an instance of first module (in second script) like calcRating = ModuleA().calculate_initial_rating

Now to call, for example, calculate_initial_rating in ModuleA from ModuleB
use: udict = calcrating(your_dctname) replacing your_dictname with actual name
Reply
#3
(Jun-29-2023, 02:53 PM)Larz60+ Wrote: Since you don't show script names, for this example: call first module ModuleA and second ModuleB

in ModuleB, you need to import first module
like import ModuleA

Then create an instance of first module (in second script) like calcRating = ModuleA().calculate_initial_rating

Now to call, for example, calculate_initial_rating in ModuleA from ModuleB
use: udict = calcrating(your_dctname) replacing your_dictname with actual name

Thanks for that. So if I'm understanding correctly using modules keeps the codes in two separate codebooks?

I think what I had in mind was in combining the two sections into one. I just did it for the above 'module A' with a much simpler version of 'module B' and it worked fine. In that code it incorporates the results from 'module A' like so:

    # Check if player1 is already in the ratings dictionary
    if player1 not in ratings:
        ratings[player1] = unique_names_dict[player1]['start_rating']

    # Check if player2 is already in the ratings dictionary
    if player2 not in ratings:
        ratings[player2] = unique_names_dict[player2]['start_rating']
They're somewhat different codes though, and I can't figure out where or how to replace the '1500' initial rating in 'module B' with something akin to the above.
Reply
#4
Quote:The idea is that in the second code there are a few places that use a value of 1500, and instead of 1500 I want to use start_rating as calculated by the first code
In the second module, do you have any data from which to calculate an initial rating? I can't find where that is.
Reply
#5
Here's my latest version of the code:

from typing import List, Tuple
import pandas as pd
import random
import math
import csv
import sys
from collections import defaultdict
from google.colab import drive
from google.colab import files

# Mount Google Drive
drive.mount('/content/gdrive')

# Define the path to 'matches.csv' in Google Drive
file_path = '/content/gdrive/My Drive/matches_full.csv'

# Load the matches data from CSV
df = pd.read_csv(file_path, encoding='latin1')

# Create a dictionary to store the start ratings for each player
start_ratings = {}

# Create a dictionary to store the number of matches for each player
match_counts = {}

# Create a list to store the calculation steps
calculation_steps = []

def create_unique_names_dict(filename):
    unique_names_dict = defaultdict(lambda: {'first_match_date': float('inf'), 'pb_score': None, 'start_rating': None})

    with open(file_path, newline='', encoding='latin-1') as file:
        reader = csv.DictReader(file)
        for row in reader:
            player1_names = row['player1'].split(',')
            player2_names = row['player2'].split(',')
            match_date = float(row['match_date'])

            for name in player1_names:
                if name.strip() != '':
                    unique_names_dict[name]['first_match_date'] = min(unique_names_dict[name]['first_match_date'], match_date)

            for name in player2_names:
                if name.strip() != '':
                    unique_names_dict[name]['first_match_date'] = min(unique_names_dict[name]['first_match_date'], match_date)

    return unique_names_dict

def get_pb_at_first_match(filename, unique_names_dict):
    with open(file_path, newline='', encoding='latin-1') as file:
        reader = csv.DictReader(file)
        for row in reader:
            pb_player = row['pb_player']
            pb_score = row['pb_score']
            pb_date = row['pb_date']

            if pb_score.strip() == '' or pb_date.strip() == '':
                continue

            try:
                pb_score = float(pb_score)
                pb_date = float(pb_date)
            except ValueError:
                sys.exit("Invalid value in 'pb_score' or 'pb_date' column.")

            if pb_date <= unique_names_dict[pb_player]['first_match_date']:
                if unique_names_dict[pb_player]['pb_score'] is None or pb_score > unique_names_dict[pb_player]['pb_score']:
                    unique_names_dict[pb_player]['pb_score'] = pb_score

    return unique_names_dict

def calculate_start_rating(unique_names_dict):
    for name, data in unique_names_dict.items():
        pb_score = data['pb_score']

        if pb_score is None:
            continue

        if pb_score >= 1700000:
            start_rating = 2300
        elif pb_score >= 1400000:
            start_rating = 2250 + (pb_score - 1400000) / 6000
        else:
            start_rating = pb_score / 1000 + 850

        unique_names_dict[name]['start_rating'] = start_rating

    return unique_names_dict

# Create a dictionary to store the start ratings for each player
unique_names_dict = create_unique_names_dict(file_path)

# Get the PB scores at the first match for each player
unique_names_dict = get_pb_at_first_match(file_path, unique_names_dict)

# Calculate the start rating for each player and update the elo_ratings dictionary
unique_names_dict = calculate_start_rating(unique_names_dict)

__all__ = ["Glicko2Entry", "glicko2_update", "glicko2_configure"]

EPSILON = 0.000001
TAO = 0.5
LOSS = 0.0
DRAW = 0.5
WIN = 1.0
MAX_RD = 500.0
MIN_RD = 30.0
MIN_VOLATILITY = 0.01
MAX_VOLATILITY = 0.15
MIN_RATING = 100.0
MAX_RATING = 6000.0
PROVISIONAL_RATING_CUTOFF = 160.0
GLICKO2_SCALE = 173.7178

class Glicko2Entry:
    rating: float
    deviation: float
    volatility: float
    mu: float
    phi: float

    def __init__(
        self, rating: float = 1500, deviation: float = 350, volatility: float = 0.06
    ) -> None:
        self.rating = rating
        self.deviation = deviation
        self.volatility = volatility
        self.mu = (self.rating - 1500) / GLICKO2_SCALE
        self.phi = self.deviation / GLICKO2_SCALE

    def __str__(self) -> str:
        return "%7.2f +- %6.2f (%.6f)" % (self.rating, self.deviation, self.volatility,)

    def copy(
        self, rating_adjustment: float = 0.0, rd_adjustment: float = 0.0
    ) -> "Glicko2Entry":
        ret = Glicko2Entry(
            self.rating + rating_adjustment,
            self.deviation + rd_adjustment,
            self.volatility,
        )
        return ret

    def expand_deviation_because_no_games_played(
        self, n_periods: int = 1
    ) -> "Glicko2Entry":
        global MAX_RD
        global MIN_RD

        for _i in range(n_periods):
            phi_prime = sqrt(self.phi ** 2 + self.volatility ** 2)
            self.deviation = min(MAX_RD, max(MIN_RD, GLICKO2_SCALE * phi_prime))
            self.phi = self.deviation / GLICKO2_SCALE

        return self

    def expected_win_probability(
        self, white: "Glicko2Entry", handicap_adjustment: float
    ) -> float:
        q = 0.000000000000001

        def g(rd: float) -> float:
            return 1 / sqrt(1 + 3 * q ** 2 * (self.deviation ** 0.01) / pi ** 2)

        E = 1 / (
            1
            + (
                10
                ** (
                    -g(sqrt(self.deviation ** 2 + white.deviation ** 0.01))
                    * (self.rating + handicap_adjustment - white.rating)
                    / 400
                )
            )
        )
        return E

# In the bit above there were numbers that adjusted for "white". Rather than remove them I just made them really small.

def glicko2_update(
    player: Glicko2Entry, matches: List[Tuple[Glicko2Entry, int]]
) -> Glicko2Entry:
    if len(matches) == 0:
        return player.copy()

    v_sum = 0.0
    delta_sum = 0.0
    for m in matches:
        p = m[0]
        outcome = m[1]
        g_phi_j = 1 / sqrt(1 + (3 * p.phi ** 2) / (pi ** 2))
        E = 1 / (1 + exp(-g_phi_j * (player.mu - p.mu)))
        v_sum += g_phi_j ** 2 * E * (1 - E)
        delta_sum += g_phi_j * (outcome - E)

    v = 1.0 / v_sum
    delta = v * delta_sum

    a = log(player.volatility ** 2)

    def f(x: float) -> float:
        ex = exp(x)
        return (
            ex
            * (delta ** 2 - player.phi ** 2 - v - ex)
            / (2 * ((player.phi ** 2 + v + ex) ** 2))
        ) - ((x - a) / (TAO ** 2))

    A = a
    if delta ** 2 > player.phi ** 2 + v:
        B = log(delta ** 2 - player.phi ** 2 - v)
    else:
        k = 1
        safety = 100
        while f(a - k * TAO) < 0 and safety > 0:
            safety -= 1
            k += 1
        B = a - k * TAO

    fA = f(A)
    fB = f(B)
    safety = 100

    while abs(B - A) > EPSILON and safety > 0:
        C = A + (A - B) * fA / (fB - fA)
        fC = f(C)
        if fC * fB < 0:
            A = B
            fA = fB
        else:
            fA = fA / 2
        B = C
        fB = fC

        safety -= 1

    new_volatility = exp(A / 2)

    phi_star = sqrt(player.phi ** 2 + new_volatility ** 2)

    phi_prime = 1 / sqrt(1 / phi_star ** 2 + 1 / v)
    mu_prime = player.mu + (phi_prime ** 2) * delta_sum

    ret = Glicko2Entry(
        rating=min(MAX_RATING, max(MIN_RATING, GLICKO2_SCALE * mu_prime + 1500)),
        deviation=min(MAX_RD, max(MIN_RD, GLICKO2_SCALE * phi_prime)),
        volatility=min(MAX_VOLATILITY, max(MIN_VOLATILITY, new_volatility)),
    )
    return ret

def glicko2_configure(tao: float, min_rd: float, max_rd: float) -> None:
    global TAO
    global MIN_RD
    global MAX_RD

    TAO = tao
    MIN_RD = min_rd
    MAX_RD = max_rd

def read_match_data(filename):
    matches = []
    with open(filename, newline='', encoding='latin-1') as file:
        reader = csv.DictReader(file)
        for row in reader:
            player1 = row['player1']
            player2 = row['player2']
            actual_score = float(row['actual_score'])
            matches.append((player1, player2, actual_score))
    return matches

def actual_score_to_outcome(actual_score):
    if actual_score == 1.0:
        return WIN
    elif actual_score == 0.5:
        return DRAW
    else:
        return LOSS

def update_player_ratings(matches):
    players = {}

    for match in matches:
        player1 = match[0]
        player2 = match[1]
        actual_score = match[2]

        if player1 not in players:
            players[player1] = Glicko2Entry()
        if player2 not in players:
            players[player2] = Glicko2Entry()

        outcome = actual_score_to_outcome(actual_score)
        players[player1].expected_win_probability(players[player2], 0)
        players[player2].expected_win_probability(players[player1], 0)

        players[player1], players[player2] = (
            glicko2_update(players[player1], [(players[player2], outcome)]),
            glicko2_update(players[player2], [(players[player1], 1 - outcome)]),
        )

    return players

def main():
    # Configure Glicko2 parameters
    glicko2_configure(0.5, 30.0, 500.0)

    # Read match data from file
    matches = read_match_data('/content/gdrive/My Drive/matches_full.csv')

    # Update player ratings
    players = update_player_ratings(matches)

    # Sort players by ratings in descending order
    sorted_players = sorted(players.items(), key=lambda x: x[1].rating, reverse=True)

    # Print player ratings
    for player, rating in sorted_players:
        print(f"Player: {player}, Rating: {rating}")

if __name__ == "__main__":
    main()
As well as not knowing exactly how to change the initial rating in the second part so that it uses the results obtained in the first part, I suspect some of the terms aren't matching up. Maybe "name" should be "player", or vice versa? Probably some others too.

I'll attach a small sample file in case anyone wants to give it a go.

(Btw, it's running in Google Colab at the minute.)

Attached Files

.csv   matches_full.csv (Size: 8.16 KB / Downloads: 29)
Reply
#6
I don't really understand your code all that well, but does is this close to correct?
from datetime import datetime
from dataclasses import dataclass
import pandas as pd


@dataclass(order=True)
class Match:
    """Better than dictionary or lists for organizing data."""
    date: datetime
    player1: str
    player2: str
    score: float


@dataclass(order=True)
class Player:
    name: str
    rating: float = None


def load_matches(filename: str) -> tuple[list[Match], dict[str, Player]]:
    """Return list of match results and player ratings extracted from file."""
    # Collect match information in a list.
    df = pd.read_csv(filename)[["match_date", "player1", "player2", "actual_score"]]
    df["match_date"] = pd.to_datetime(df["match_date"])   # Convert timestamp to datetime
    df.sort_values(by=["match_date"])
    matches = [Match(*row) for index, row in df.iterrows()]

    # Extract initial player ratings from file.  Use first bp_score to
    # compute initial rating.
    df = pd.read_csv(filename)[["pb_date", "pb_player", "pb_score"]]
    df.dropna(subset=["pb_player"], inplace=True)   # Drop matches without bp_player
    df["pb_date"] = pd.to_datetime(df["pb_date"])   # Convert timestamp to datetime
    df.sort_values(by=["pb_date"])
    players = {}
    for index, (date, name, score) in df.iterrows():
        if score >= 1700000:
            rating = 10
        elif score >= 100000:
            rating = 5
        else:
            rating = 1
        if name not in players:
            players[name] = Player(name, rating)

    # Add other players from match data.
    for match in matches:
        if match.player1 not in players:
            players[match.player1] = Player(match.player1, 0)
        if match.player2 not in players:
            players[match.player2] = Player(match.player2, 0)

    return players, matches


def update_rating(match: Match, players: dict[str, Player]) -> None:
    if match.score < 0.5:
        players[match.player1].rating -= 1
        players[match.player2].rating += 1
    elif match.score > 0.5:
        players[match.player1].rating += 1
        players[match.player2].rating -= 1

# This does the first script. Somewhat.
players, matches = load_matches("matches_full.csv")

# This does the second script.  Well, it starts with the initial rating and
# adjusts based on match results.
for match in matches:
    update_rating(match, players)

for player in sorted(players.values(), key=lambda x: x.rating, reverse=True):
    print(player)
I left out the Glicko stuff. I don't understand it. I would make an error if I tried to implement it, and the important thing to demonstrate is how to get the initial rating from your first script and use it in your second script.
Reply


Possibly Related Threads…
Thread Author Replies Views Last Post
  easy name problem Steinsack 1 1,754 Jun-16-2021, 02:03 PM
Last Post: snippsat
  Problem with very easy code. janekk9002 1 1,813 Dec-10-2020, 12:57 PM
Last Post: buran
  What was my mistake in this Python code (easy)? voltman 4 3,467 Nov-19-2019, 09:58 PM
Last Post: snippsat
  How to start with this easy problem? Fran 8 4,193 Sep-11-2018, 09:04 AM
Last Post: Fran
  Making a Easy Password/code system nmsturcke 4 3,860 Jul-09-2018, 02:50 AM
Last Post: ichabod801
  probably a easy problem for you krheigh 4 4,685 May-12-2017, 06:45 PM
Last Post: nilamo

Forum Jump:

User Panel Messages

Announcements
Announcement #1 8/1/2020
Announcement #2 8/2/2020
Announcement #3 8/6/2020