Python Forum

Full Version: Code problem - probably easy fix?
You're currently viewing a stripped down version of our content. View the full version with proper formatting.
Hi, I have two codes that I want to combine. I'm a newb and though this seems like it should be simple for some reason everything I try fails. The two codes (which both function perfectly well individually) are:

import csv
from collections import defaultdict

def create_unique_names_dict(filename):
    unique_names_dict = defaultdict(lambda: {'first_match_date': float('inf'), 'pb_score': None, 'start_rating': None})

    with open(filename, newline='', encoding='latin-1') as file:
        reader = csv.DictReader(file)
        for row in reader:
            player1_names = row['player1'].split(',')
            player2_names = row['player2'].split(',')
            match_date = float(row['match_date'])

            for name in player1_names:
                if name.strip() != '':
                    unique_names_dict[name]['first_match_date'] = min(unique_names_dict[name]['first_match_date'], match_date)

            for name in player2_names:
                if name.strip() != '':
                    unique_names_dict[name]['first_match_date'] = min(unique_names_dict[name]['first_match_date'], match_date)

    return unique_names_dict

def get_pb_at_first_match(filename, unique_names_dict):
    with open(filename, newline='', encoding='latin-1') as file:
        reader = csv.DictReader(file)
        for row in reader:
            pb_player = row['pb_player']
            pb_score = row['pb_score']
            pb_date = row['pb_date']

            if pb_score.strip() == '' or pb_date.strip() == '':
                continue

            try:
                pb_score = float(pb_score)
                pb_date = float(pb_date)
            except ValueError:
                sys.exit("Invalid value in 'pb_score' or 'pb_date' column.")

            if pb_date <= unique_names_dict[pb_player]['first_match_date']:
                if unique_names_dict[pb_player]['pb_score'] is None or pb_score > unique_names_dict[pb_player]['pb_score']:
                    unique_names_dict[pb_player]['pb_score'] = pb_score

    return unique_names_dict

def calculate_initial_rating(unique_names_dict):
    for name, data in unique_names_dict.items():
        pb_score = data['pb_score']

        if pb_score is None:
            continue

        if pb_score >= 1700000:
            start_rating = 1900
        elif pb_score >= 1400000:
            start_rating = 1850 + (pb_score - 1400000) / 6000
        else:
            start_rating = pb_score / 1000 + 450

        unique_names_dict[name]['start_rating'] = start_rating

    return unique_names_dict

# Create a dictionary to store the start ratings for each player
unique_names_dict = create_unique_names_dict(file_path)

# Get the PB scores at the first match for each player
unique_names_dict = get_pb_at_first_match(file_path, unique_names_dict)

# Calculate the start rating for each player and update the elo_ratings dictionary
unique_names_dict = calculate_start_rating(unique_names_dict)

# Print the initial ratings in columns
for name, data in unique_names_dict.items():
    print(f"{name}\t\t{data[('start_rating')]}")
And:

from math import exp, log, pi, sqrt
from typing import List, Tuple
import csv

__all__ = ["Glicko2Entry", "glicko2_update", "glicko2_configure"]

EPSILON = 0.000001
TAO = 0.5
LOSS = 0.0
DRAW = 0.5
WIN = 1.0
MAX_RD = 500.0
MIN_RD = 30.0
MIN_VOLATILITY = 0.01
MAX_VOLATILITY = 0.15
MIN_RATING = 100.0
MAX_RATING = 6000.0
PROVISIONAL_RATING_CUTOFF = 160.0
GLICKO2_SCALE = 173.7178

class Glicko2Entry:
    rating: float
    deviation: float
    volatility: float
    mu: float
    phi: float

    def __init__(
        self, rating: float = 1500, deviation: float = 350, volatility: float = 0.06
    ) -> None:
        self.rating = rating
        self.deviation = deviation
        self.volatility = volatility
        self.mu = (self.rating - 1500) / GLICKO2_SCALE
        self.phi = self.deviation / GLICKO2_SCALE

    def __str__(self) -> str:
        return "%7.2f +- %6.2f (%.6f)" % (self.rating, self.deviation, self.volatility,)

    def copy(
        self, rating_adjustment: float = 0.0, rd_adjustment: float = 0.0
    ) -> "Glicko2Entry":
        ret = Glicko2Entry(
            self.rating + rating_adjustment,
            self.deviation + rd_adjustment,
            self.volatility,
        )
        return ret

    def expand_deviation_because_no_games_played(
        self, n_periods: int = 1
    ) -> "Glicko2Entry":
        global MAX_RD
        global MIN_RD

        for _i in range(n_periods):
            phi_prime = sqrt(self.phi ** 2 + self.volatility ** 2)
            self.deviation = min(MAX_RD, max(MIN_RD, GLICKO2_SCALE * phi_prime))
            self.phi = self.deviation / GLICKO2_SCALE

        return self

    def expected_win_probability(
        self, white: "Glicko2Entry", handicap_adjustment: float
    ) -> float:
        q = 0.000000000000001

        def g(rd: float) -> float:
            return 1 / sqrt(1 + 3 * q ** 2 * (self.deviation ** 0.01) / pi ** 2)

        E = 1 / (
            1
            + (
                10
                ** (
                    -g(sqrt(self.deviation ** 2 + white.deviation ** 0.01))
                    * (self.rating + handicap_adjustment - white.rating)
                    / 400
                )
            )
        )
        return E

# In the bit above there were numbers that adjusted for "white". Rather than remove them I just made them really small.

def glicko2_update(
    player: Glicko2Entry, matches: List[Tuple[Glicko2Entry, int]]
) -> Glicko2Entry:
    if len(matches) == 0:
        return player.copy()

    v_sum = 0.0
    delta_sum = 0.0
    for m in matches:
        p = m[0]
        outcome = m[1]
        g_phi_j = 1 / sqrt(1 + (3 * p.phi ** 2) / (pi ** 2))
        E = 1 / (1 + exp(-g_phi_j * (player.mu - p.mu)))
        v_sum += g_phi_j ** 2 * E * (1 - E)
        delta_sum += g_phi_j * (outcome - E)

    v = 1.0 / v_sum
    delta = v * delta_sum

    a = log(player.volatility ** 2)

    def f(x: float) -> float:
        ex = exp(x)
        return (
            ex
            * (delta ** 2 - player.phi ** 2 - v - ex)
            / (2 * ((player.phi ** 2 + v + ex) ** 2))
        ) - ((x - a) / (TAO ** 2))

    A = a
    if delta ** 2 > player.phi ** 2 + v:
        B = log(delta ** 2 - player.phi ** 2 - v)
    else:
        k = 1
        safety = 100
        while f(a - k * TAO) < 0 and safety > 0:
            safety -= 1
            k += 1
        B = a - k * TAO

    fA = f(A)
    fB = f(B)
    safety = 100

    while abs(B - A) > EPSILON and safety > 0:
        C = A + (A - B) * fA / (fB - fA)
        fC = f(C)
        if fC * fB < 0:
            A = B
            fA = fB
        else:
            fA = fA / 2
        B = C
        fB = fC

        safety -= 1

    new_volatility = exp(A / 2)

    phi_star = sqrt(player.phi ** 2 + new_volatility ** 2)

    phi_prime = 1 / sqrt(1 / phi_star ** 2 + 1 / v)
    mu_prime = player.mu + (phi_prime ** 2) * delta_sum

    ret = Glicko2Entry(
        rating=min(MAX_RATING, max(MIN_RATING, GLICKO2_SCALE * mu_prime + 1500)),
        deviation=min(MAX_RD, max(MIN_RD, GLICKO2_SCALE * phi_prime)),
        volatility=min(MAX_VOLATILITY, max(MIN_VOLATILITY, new_volatility)),
    )
    return ret

def glicko2_configure(tao: float, min_rd: float, max_rd: float) -> None:
    global TAO
    global MIN_RD
    global MAX_RD

    TAO = tao
    MIN_RD = min_rd
    MAX_RD = max_rd

def read_match_data(filename):
    matches = []
    with open(filename, newline='', encoding='latin-1') as file:
        reader = csv.DictReader(file)
        for row in reader:
            player1 = row['player1']
            player2 = row['player2']
            actual_score = float(row['actual_score'])
            matches.append((player1, player2, actual_score))
    return matches

def actual_score_to_outcome(actual_score):
    if actual_score == 1.0:
        return WIN
    elif actual_score == 0.5:
        return DRAW
    else:
        return LOSS

def update_player_ratings(matches):
    players = {}

    for match in matches:
        player1 = match[0]
        player2 = match[1]
        actual_score = match[2]

        if player1 not in players:
            players[player1] = Glicko2Entry()
        if player2 not in players:
            players[player2] = Glicko2Entry()

        outcome = actual_score_to_outcome(actual_score)
        players[player1].expected_win_probability(players[player2], 0)
        players[player2].expected_win_probability(players[player1], 0)

        players[player1], players[player2] = (
            glicko2_update(players[player1], [(players[player2], outcome)]),
            glicko2_update(players[player2], [(players[player1], 1 - outcome)]),
        )

    return players

def main():
    # Configure Glicko2 parameters
    glicko2_configure(0.5, 30.0, 500.0)

    # Read match data from file
    matches = read_match_data('/content/gdrive/My Drive/matches_full.csv')

    # Update player ratings
    players = update_player_ratings(matches)

    # Sort players by ratings in descending order
    sorted_players = sorted(players.items(), key=lambda x: x[1].rating, reverse=True)

    # Print player ratings
    for player, rating in sorted_players:
        print(f"Player: {player}, Rating: {rating}")

if __name__ == "__main__":
    main()
The idea is that in the second code there are a few places that use a value of 1500, and instead of 1500 I want to use start_rating as calculated by the first code.

I feel like it should be straightforward but I can't for the life of me make it work. Please help!
Since you don't show script names, for this example: call first module ModuleA and second ModuleB

in ModuleB, you need to import first module
like import ModuleA

Then create an instance of first module (in second script) like calcRating = ModuleA().calculate_initial_rating

Now to call, for example, calculate_initial_rating in ModuleA from ModuleB
use: udict = calcrating(your_dctname) replacing your_dictname with actual name
(Jun-29-2023, 02:53 PM)Larz60+ Wrote: [ -> ]Since you don't show script names, for this example: call first module ModuleA and second ModuleB

in ModuleB, you need to import first module
like import ModuleA

Then create an instance of first module (in second script) like calcRating = ModuleA().calculate_initial_rating

Now to call, for example, calculate_initial_rating in ModuleA from ModuleB
use: udict = calcrating(your_dctname) replacing your_dictname with actual name

Thanks for that. So if I'm understanding correctly using modules keeps the codes in two separate codebooks?

I think what I had in mind was in combining the two sections into one. I just did it for the above 'module A' with a much simpler version of 'module B' and it worked fine. In that code it incorporates the results from 'module A' like so:

    # Check if player1 is already in the ratings dictionary
    if player1 not in ratings:
        ratings[player1] = unique_names_dict[player1]['start_rating']

    # Check if player2 is already in the ratings dictionary
    if player2 not in ratings:
        ratings[player2] = unique_names_dict[player2]['start_rating']
They're somewhat different codes though, and I can't figure out where or how to replace the '1500' initial rating in 'module B' with something akin to the above.
Quote:The idea is that in the second code there are a few places that use a value of 1500, and instead of 1500 I want to use start_rating as calculated by the first code
In the second module, do you have any data from which to calculate an initial rating? I can't find where that is.
Here's my latest version of the code:

from typing import List, Tuple
import pandas as pd
import random
import math
import csv
import sys
from collections import defaultdict
from google.colab import drive
from google.colab import files

# Mount Google Drive
drive.mount('/content/gdrive')

# Define the path to 'matches.csv' in Google Drive
file_path = '/content/gdrive/My Drive/matches_full.csv'

# Load the matches data from CSV
df = pd.read_csv(file_path, encoding='latin1')

# Create a dictionary to store the start ratings for each player
start_ratings = {}

# Create a dictionary to store the number of matches for each player
match_counts = {}

# Create a list to store the calculation steps
calculation_steps = []

def create_unique_names_dict(filename):
    unique_names_dict = defaultdict(lambda: {'first_match_date': float('inf'), 'pb_score': None, 'start_rating': None})

    with open(file_path, newline='', encoding='latin-1') as file:
        reader = csv.DictReader(file)
        for row in reader:
            player1_names = row['player1'].split(',')
            player2_names = row['player2'].split(',')
            match_date = float(row['match_date'])

            for name in player1_names:
                if name.strip() != '':
                    unique_names_dict[name]['first_match_date'] = min(unique_names_dict[name]['first_match_date'], match_date)

            for name in player2_names:
                if name.strip() != '':
                    unique_names_dict[name]['first_match_date'] = min(unique_names_dict[name]['first_match_date'], match_date)

    return unique_names_dict

def get_pb_at_first_match(filename, unique_names_dict):
    with open(file_path, newline='', encoding='latin-1') as file:
        reader = csv.DictReader(file)
        for row in reader:
            pb_player = row['pb_player']
            pb_score = row['pb_score']
            pb_date = row['pb_date']

            if pb_score.strip() == '' or pb_date.strip() == '':
                continue

            try:
                pb_score = float(pb_score)
                pb_date = float(pb_date)
            except ValueError:
                sys.exit("Invalid value in 'pb_score' or 'pb_date' column.")

            if pb_date <= unique_names_dict[pb_player]['first_match_date']:
                if unique_names_dict[pb_player]['pb_score'] is None or pb_score > unique_names_dict[pb_player]['pb_score']:
                    unique_names_dict[pb_player]['pb_score'] = pb_score

    return unique_names_dict

def calculate_start_rating(unique_names_dict):
    for name, data in unique_names_dict.items():
        pb_score = data['pb_score']

        if pb_score is None:
            continue

        if pb_score >= 1700000:
            start_rating = 2300
        elif pb_score >= 1400000:
            start_rating = 2250 + (pb_score - 1400000) / 6000
        else:
            start_rating = pb_score / 1000 + 850

        unique_names_dict[name]['start_rating'] = start_rating

    return unique_names_dict

# Create a dictionary to store the start ratings for each player
unique_names_dict = create_unique_names_dict(file_path)

# Get the PB scores at the first match for each player
unique_names_dict = get_pb_at_first_match(file_path, unique_names_dict)

# Calculate the start rating for each player and update the elo_ratings dictionary
unique_names_dict = calculate_start_rating(unique_names_dict)

__all__ = ["Glicko2Entry", "glicko2_update", "glicko2_configure"]

EPSILON = 0.000001
TAO = 0.5
LOSS = 0.0
DRAW = 0.5
WIN = 1.0
MAX_RD = 500.0
MIN_RD = 30.0
MIN_VOLATILITY = 0.01
MAX_VOLATILITY = 0.15
MIN_RATING = 100.0
MAX_RATING = 6000.0
PROVISIONAL_RATING_CUTOFF = 160.0
GLICKO2_SCALE = 173.7178

class Glicko2Entry:
    rating: float
    deviation: float
    volatility: float
    mu: float
    phi: float

    def __init__(
        self, rating: float = 1500, deviation: float = 350, volatility: float = 0.06
    ) -> None:
        self.rating = rating
        self.deviation = deviation
        self.volatility = volatility
        self.mu = (self.rating - 1500) / GLICKO2_SCALE
        self.phi = self.deviation / GLICKO2_SCALE

    def __str__(self) -> str:
        return "%7.2f +- %6.2f (%.6f)" % (self.rating, self.deviation, self.volatility,)

    def copy(
        self, rating_adjustment: float = 0.0, rd_adjustment: float = 0.0
    ) -> "Glicko2Entry":
        ret = Glicko2Entry(
            self.rating + rating_adjustment,
            self.deviation + rd_adjustment,
            self.volatility,
        )
        return ret

    def expand_deviation_because_no_games_played(
        self, n_periods: int = 1
    ) -> "Glicko2Entry":
        global MAX_RD
        global MIN_RD

        for _i in range(n_periods):
            phi_prime = sqrt(self.phi ** 2 + self.volatility ** 2)
            self.deviation = min(MAX_RD, max(MIN_RD, GLICKO2_SCALE * phi_prime))
            self.phi = self.deviation / GLICKO2_SCALE

        return self

    def expected_win_probability(
        self, white: "Glicko2Entry", handicap_adjustment: float
    ) -> float:
        q = 0.000000000000001

        def g(rd: float) -> float:
            return 1 / sqrt(1 + 3 * q ** 2 * (self.deviation ** 0.01) / pi ** 2)

        E = 1 / (
            1
            + (
                10
                ** (
                    -g(sqrt(self.deviation ** 2 + white.deviation ** 0.01))
                    * (self.rating + handicap_adjustment - white.rating)
                    / 400
                )
            )
        )
        return E

# In the bit above there were numbers that adjusted for "white". Rather than remove them I just made them really small.

def glicko2_update(
    player: Glicko2Entry, matches: List[Tuple[Glicko2Entry, int]]
) -> Glicko2Entry:
    if len(matches) == 0:
        return player.copy()

    v_sum = 0.0
    delta_sum = 0.0
    for m in matches:
        p = m[0]
        outcome = m[1]
        g_phi_j = 1 / sqrt(1 + (3 * p.phi ** 2) / (pi ** 2))
        E = 1 / (1 + exp(-g_phi_j * (player.mu - p.mu)))
        v_sum += g_phi_j ** 2 * E * (1 - E)
        delta_sum += g_phi_j * (outcome - E)

    v = 1.0 / v_sum
    delta = v * delta_sum

    a = log(player.volatility ** 2)

    def f(x: float) -> float:
        ex = exp(x)
        return (
            ex
            * (delta ** 2 - player.phi ** 2 - v - ex)
            / (2 * ((player.phi ** 2 + v + ex) ** 2))
        ) - ((x - a) / (TAO ** 2))

    A = a
    if delta ** 2 > player.phi ** 2 + v:
        B = log(delta ** 2 - player.phi ** 2 - v)
    else:
        k = 1
        safety = 100
        while f(a - k * TAO) < 0 and safety > 0:
            safety -= 1
            k += 1
        B = a - k * TAO

    fA = f(A)
    fB = f(B)
    safety = 100

    while abs(B - A) > EPSILON and safety > 0:
        C = A + (A - B) * fA / (fB - fA)
        fC = f(C)
        if fC * fB < 0:
            A = B
            fA = fB
        else:
            fA = fA / 2
        B = C
        fB = fC

        safety -= 1

    new_volatility = exp(A / 2)

    phi_star = sqrt(player.phi ** 2 + new_volatility ** 2)

    phi_prime = 1 / sqrt(1 / phi_star ** 2 + 1 / v)
    mu_prime = player.mu + (phi_prime ** 2) * delta_sum

    ret = Glicko2Entry(
        rating=min(MAX_RATING, max(MIN_RATING, GLICKO2_SCALE * mu_prime + 1500)),
        deviation=min(MAX_RD, max(MIN_RD, GLICKO2_SCALE * phi_prime)),
        volatility=min(MAX_VOLATILITY, max(MIN_VOLATILITY, new_volatility)),
    )
    return ret

def glicko2_configure(tao: float, min_rd: float, max_rd: float) -> None:
    global TAO
    global MIN_RD
    global MAX_RD

    TAO = tao
    MIN_RD = min_rd
    MAX_RD = max_rd

def read_match_data(filename):
    matches = []
    with open(filename, newline='', encoding='latin-1') as file:
        reader = csv.DictReader(file)
        for row in reader:
            player1 = row['player1']
            player2 = row['player2']
            actual_score = float(row['actual_score'])
            matches.append((player1, player2, actual_score))
    return matches

def actual_score_to_outcome(actual_score):
    if actual_score == 1.0:
        return WIN
    elif actual_score == 0.5:
        return DRAW
    else:
        return LOSS

def update_player_ratings(matches):
    players = {}

    for match in matches:
        player1 = match[0]
        player2 = match[1]
        actual_score = match[2]

        if player1 not in players:
            players[player1] = Glicko2Entry()
        if player2 not in players:
            players[player2] = Glicko2Entry()

        outcome = actual_score_to_outcome(actual_score)
        players[player1].expected_win_probability(players[player2], 0)
        players[player2].expected_win_probability(players[player1], 0)

        players[player1], players[player2] = (
            glicko2_update(players[player1], [(players[player2], outcome)]),
            glicko2_update(players[player2], [(players[player1], 1 - outcome)]),
        )

    return players

def main():
    # Configure Glicko2 parameters
    glicko2_configure(0.5, 30.0, 500.0)

    # Read match data from file
    matches = read_match_data('/content/gdrive/My Drive/matches_full.csv')

    # Update player ratings
    players = update_player_ratings(matches)

    # Sort players by ratings in descending order
    sorted_players = sorted(players.items(), key=lambda x: x[1].rating, reverse=True)

    # Print player ratings
    for player, rating in sorted_players:
        print(f"Player: {player}, Rating: {rating}")

if __name__ == "__main__":
    main()
As well as not knowing exactly how to change the initial rating in the second part so that it uses the results obtained in the first part, I suspect some of the terms aren't matching up. Maybe "name" should be "player", or vice versa? Probably some others too.

I'll attach a small sample file in case anyone wants to give it a go.

(Btw, it's running in Google Colab at the minute.)
I don't really understand your code all that well, but does is this close to correct?
from datetime import datetime
from dataclasses import dataclass
import pandas as pd


@dataclass(order=True)
class Match:
    """Better than dictionary or lists for organizing data."""
    date: datetime
    player1: str
    player2: str
    score: float


@dataclass(order=True)
class Player:
    name: str
    rating: float = None


def load_matches(filename: str) -> tuple[list[Match], dict[str, Player]]:
    """Return list of match results and player ratings extracted from file."""
    # Collect match information in a list.
    df = pd.read_csv(filename)[["match_date", "player1", "player2", "actual_score"]]
    df["match_date"] = pd.to_datetime(df["match_date"])   # Convert timestamp to datetime
    df.sort_values(by=["match_date"])
    matches = [Match(*row) for index, row in df.iterrows()]

    # Extract initial player ratings from file.  Use first bp_score to
    # compute initial rating.
    df = pd.read_csv(filename)[["pb_date", "pb_player", "pb_score"]]
    df.dropna(subset=["pb_player"], inplace=True)   # Drop matches without bp_player
    df["pb_date"] = pd.to_datetime(df["pb_date"])   # Convert timestamp to datetime
    df.sort_values(by=["pb_date"])
    players = {}
    for index, (date, name, score) in df.iterrows():
        if score >= 1700000:
            rating = 10
        elif score >= 100000:
            rating = 5
        else:
            rating = 1
        if name not in players:
            players[name] = Player(name, rating)

    # Add other players from match data.
    for match in matches:
        if match.player1 not in players:
            players[match.player1] = Player(match.player1, 0)
        if match.player2 not in players:
            players[match.player2] = Player(match.player2, 0)

    return players, matches


def update_rating(match: Match, players: dict[str, Player]) -> None:
    if match.score < 0.5:
        players[match.player1].rating -= 1
        players[match.player2].rating += 1
    elif match.score > 0.5:
        players[match.player1].rating += 1
        players[match.player2].rating -= 1

# This does the first script. Somewhat.
players, matches = load_matches("matches_full.csv")

# This does the second script.  Well, it starts with the initial rating and
# adjusts based on match results.
for match in matches:
    update_rating(match, players)

for player in sorted(players.values(), key=lambda x: x.rating, reverse=True):
    print(player)
I left out the Glicko stuff. I don't understand it. I would make an error if I tried to implement it, and the important thing to demonstrate is how to get the initial rating from your first script and use it in your second script.