Code cleanup

This commit is contained in:
Brandon Rozek 2024-01-05 23:42:44 -05:00
parent 87986d4b2a
commit 8d964f609b
No known key found for this signature in database
GPG key ID: 26E457DA82C9F480
6 changed files with 353 additions and 353 deletions

View file

@ -1,10 +1,8 @@
"""
Author: Brandon Rozek
TODO: Argparse
Client for the WordGuess pubnix game.
"""
from datetime import datetime
from pubnix import (
run_simple_client,
send_message,
@ -12,6 +10,20 @@ from pubnix import (
)
from wordguess import WordGuess
## Messages
STARTUP_MESSAGE = lambda nc, td: f"""
Welcome to WordGuess!
The goal is to guess the word of the day.
This word is {nc} characters long.
A * character means a letter was guessed correctly,
but in the incorrect position.
Today is {td}.
"""
WIN_TEXT = lambda score: f"""
Congratulations! You solved the word of the day.
Come back tomorrow! Your score: {score}
@ -23,60 +35,58 @@ LOSE_TEXT = """
You ran out of guesses for the day. Come back tomorrow!
"""
## Game Client
class WordGuessClient:
def __init__(self):
pass
def start_game(self, client, _):
# In case the user already played today, we want to get
# information from the server on the current state
message = receive_message(client, WordGuess.GameStartMessage)
guesses_remaining = message.guesses_remaining
is_winner = message.is_winner
today = datetime.today().date()
print(f"""
Welcome to WordGuess!
The goal is to guess the word of the day.
This word is {message.num_characters} characters long.
A * character means a letter was guessed correctly,
but in the incorrect position.
Today is {today}.
""")
print(STARTUP_MESSAGE(message.num_characters, today))
if is_winner:
print(WIN_TEXT(guesses_remaining))
elif guesses_remaining > 0:
print(f"You have {guesses_remaining} guesses remaining")
print("\nTo quit, press CTRL-C.")
# Core loop if the user has more guesses remaining
try:
while not is_winner and guesses_remaining > 0:
guess = input("Guess: ")
send_message(client, WordGuess.GuessMessage(guess))
# Get response from server based on the word provided
message = receive_message(client, WordGuess.GuessResponseMessage)
if not message.valid:
print("Not a valid guess, try again.")
continue
guesses_remaining = message.guesses_remaining
is_winner = message.winner
# Display hints
print(message.hint)
print(GUESSES_REMAINING(guesses_remaining))
print("Letters Guessed:", sorted(message.letters_guessed))
if is_winner:
print(WIN_TEXT(guesses_remaining))
# Ran out of guesses, present lose text
if not is_winner:
print(LOSE_TEXT)
except KeyboardInterrupt:
pass
if __name__ == "__main__":
w = WordGuessClient()

97
game.py
View file

@ -1,97 +0,0 @@
from collections import defaultdict
from datetime import datetime
from typing import List
SEED = 838232
WORDS = []
GAME_WORDS = "gamewords.txt"
GUESS_WORDS = "guesswords.txt"
WORD_LENGTH = 5
with open("words.txt", "r") as file:
lines = file.read().splitlines()
WORDS.extend([l for l in lines if len(l) == WORD_LENGTH])
NUMBER_OF_GUESSES = 6
TODAY = datetime.today()
WORD_OF_THE_DAY = WORDS[(TODAY.year * TODAY.month * TODAY.day * SEED) % len(WORDS) ]
def valid_guess(guess: str):
if len(guess) != len(WORD_OF_THE_DAY):
return False
# Guess needs to be part of our
# dictionary
if guess not in WORDS:
return False
return True
def char_positions(word: str):
"""
Return a dictionary with positions
of each character within a word.
Ex: "hello" -> {"h": [0], "e": [1], "l": [2, 3], "o": [4]}
"""
result = defaultdict(list)
for i, char in enumerate(word):
result[char].append(i)
return result
CHAR_POSITIONS = char_positions(WORD_OF_THE_DAY)
def compare(expected: str, guess: str) -> List[str]:
output = ["_"] * len(expected)
counted_pos = set()
# (1) Check for letters in correct positions
for i, (e_char, g_char) in enumerate(zip(expected, guess)):
if e_char == g_char:
output[i] = e_char
for i, g_char in enumerate(guess):
if g_char in expected and output[i] in ["_", "-"]:
for pos in CHAR_POSITIONS[g_char]:
if pos not in counted_pos:
output[i] = "-"
counted_pos.add(pos)
break
pass
return output
if __name__ == "__main__":
num_guesses = 0
print("""
Guess words one at a time to guess the game word.
A - character means a letter was guessed correctly,
but in the incorrect position.
To quit, press CTRL-C.
""")
# start of the user name interaction
print("_ " * WORD_LENGTH)
while True:
guess = input("Guess: ").lower()
if not valid_guess(guess):
print("Not a valid guesss")
continue
num_guesses += 1
result = compare(WORD_OF_THE_DAY, guess)
print(" ".join(result))
if guess == WORD_OF_THE_DAY:
print("You won")
break
if num_guesses >= NUMBER_OF_GUESSES:
print("You lost")
break

View file

@ -1,19 +1,21 @@
"""
WordGuess Leaderboard Viewer
Author: Brandon Rozek
View leaderboard information for a particular date
# TODO: argparse
"""
from datetime import datetime
import argparse
import sqlite3
from wordguess import WordGuess
import sqlite3
from datetime import datetime
DATE = str(datetime.today().date())
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Leaderboard for WordGuess Game")
parser.add_argument("--date", type=str, help="Filter scores by date listed in YYYY-MM-DD format.")
args = vars(parser.parse_args())
# If not specified, then use today's date
DATE = args.get("date", str(datetime.today().date()))
con = sqlite3.connect(WordGuess.RESULTS_LOCATION)
try:
cur = con.cursor()

198
pubnix.py
View file

@ -1,57 +1,94 @@
"""
Pubnix Server/Client Library
Author: Brandon Rozek
The pubnix library contains server
and client code needed to communicate
over unix domain sockets on a single
machine.
For authentication, we rely on challenge
tokens and the unix permission system as
both server and client run on the same
machine.
Remaining TODO ...
TODO: Handle a user trying to connect multiple
times at the same time.
This might be handled automatically if only one
user can play at a time...
"""
import json
import pwd
import os
import sys
import socket
import binascii
from pathlib import Path
from typing import Union
TODO: Handle timeout properly
"""
from contextlib import contextmanager
from dataclasses import dataclass
# __all__ = ['send_message', 'MESSAGE_BUFFER_LEN', 'ChallengeMessage']
from pathlib import Path
from typing import Union
import binascii
import json
import os
import pwd
import sys
import socket
MESSAGE_BUFFER_LEN = 1024
TOKEN_LENGTH = 50
TIMEOUT = 5 * 60 # 5 minutes
###
# Server
###
def run_simple_server(address, fn, force_auth=True):
"""
This function can act as the main entrypoint
for the server. It takes a function that interacts
with a connected user (potentially authenticated)
Example
=======
if __name__ == "__main__":
run_simple_server(
"/home/project/.pubnix.sock",
lambda connection, user: connection.sendall(f"Hello {user}".encode())
)
"""
with start_server(address) as sock:
print("Started server at", address)
try:
while True:
with client_connection(sock) as connection:
user = None
if force_auth:
user = authenticate(connection)
receive_message(connection, StartMessage)
fn(connection, user)
except KeyboardInterrupt:
print("Stopping server...")
@contextmanager
def start_client(address):
# Create the Unix socket client
client = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
# Connect to the server
try:
client.connect(address)
except FileNotFoundError:
print("Game server is not running at location", address)
def start_server(address):
"""
Opens up a unix domain socket at the specified address
and listens for connections.
"""
if os.path.exists(address):
print(f"{address} exists -- server already running")
sys.exit(1)
# Create a unix domain socket
sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
sock.settimeout(TIMEOUT)
sock.bind(address)
sock.listen()
try:
yield client
yield sock
finally:
client.close()
def run_simple_client(address, fn, force_auth=True):
with start_client(address) as client:
if force_auth:
user = login(client)
send_message(client, StartMessage())
fn(client, user)
# Delete game.sock when finished
os.unlink(address)
@contextmanager
def client_connection(sock):
@ -68,29 +105,6 @@ def client_connection(sock):
finally: # clean up the connection
connection.close()
def run_simple_server(address, fn, force_auth=True):
with start_server(address) as sock:
print("Started server at", address)
try:
while True:
with client_connection(sock) as connection:
user = None
if force_auth:
user = authenticate(connection)
receive_message(connection, StartMessage)
fn(connection, user)
except KeyboardInterrupt:
print("Stopping server...")
def generate_token(length):
# From https://stackoverflow.com/a/41354711
return binascii.hexlify(os.urandom(length // 2)).decode()
def find_owner(path: Union[str, Path]) -> str:
return Path(path).owner()
def generate_challenge(user):
return ChallengeMessage(
username=user,
@ -132,7 +146,55 @@ def authenticate(connection):
send_message(connection, AuthSuccessMessage())
return user
MESSAGE_BUFFER_LEN = 1024
def generate_token(length):
# From https://stackoverflow.com/a/41354711
return binascii.hexlify(os.urandom(length // 2)).decode()
def find_owner(path: Union[str, Path]) -> str:
return Path(path).owner()
###
# Client
###
def run_simple_client(address, fn, force_auth=True):
"""
This function can act as the main entrypoint
for the client. It takes a function that interacts
with the server. If force_auth is enabled, then it
first authenticates as the effect user running the
program.
Example
=======
if __name__ == "__main__":
run_simple_client(
"/home/project/.pubnix.sock",
lambda connection, user: connection.sendall(f"Hello server, I'm {user}".encode())
)
"""
with start_client(address) as client:
if force_auth:
user = login(client)
send_message(client, StartMessage())
fn(client, user)
@contextmanager
def start_client(address):
# Create the Unix socket client
client = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
# Connect to the server
try:
client.connect(address)
except FileNotFoundError:
print("Server is not running at location", address)
sys.exit(1)
try:
yield client
finally:
client.close()
def login(connection):
# Send authentication message
@ -150,14 +212,17 @@ def login(connection):
# Tell server to check the challenge file
send_message(connection, ValidationMessage())
# On success, delete challenge file
try:
message = receive_message(connection, AuthSuccessMessage)
finally:
# Delete challenge file
os.unlink(challenge.location)
return user
##
# Messages
##
class MessageEncoder(json.JSONEncoder):
def default(self, o):
@ -183,25 +248,6 @@ def receive_message(connection, cls=None):
return message
@contextmanager
def start_server(address):
if os.path.exists(address):
print("game.sock exists -- game server already running")
sys.exit(1)
# Create a unix domain socket
sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
sock.settimeout(TIMEOUT)
sock.bind(address)
sock.listen()
try:
yield sock
finally:
# Delete game.sock when finished
os.unlink(address)
class ProtocolException(Exception):
pass
@ -210,7 +256,6 @@ def close_with_error(connection, content: str):
connection.sendall(json.dumps(message).encode())
raise ProtocolException()
@dataclass
class ChallengeMessage:
username: str
@ -242,7 +287,6 @@ class AuthSuccessMessage:
def __post_init__(self):
assert self.type == "authentication_success"
@dataclass
class StartMessage:
action: str = "start"

342
server.py
View file

@ -1,32 +1,196 @@
"""
WordGuess Pubnix Game Server
Author: Brandon Rozek
WordGuess game server
TODO: argparse
TODO: Multiple users trying to access it at same time?
TODO: Fix timeout issue
"""
from collections import defaultdict
from datetime import datetime
from functools import lru_cache
from pathlib import Path
from typing import List
import argparse
import os
import pickle
import random
import sqlite3
from wordguess import WordGuess
from pubnix import (
run_simple_server,
receive_message,
send_message
)
from collections import defaultdict
from wordguess import WordGuess
from typing import List
import os
import pickle
from pathlib import Path
import sqlite3
SEED = 838211
SAVE_LOCATION = "state.pickle"
class WordGuessServer:
def __init__(self, seed, word_length = 5, guesses_allowed = 6):
self.seed = seed
self.word_length = word_length
self.guesses_allowed = guesses_allowed
# date -> user str -> int
self.guesses_made = defaultdict(make_default_dict_zero)
# date -> user str -> bool
self.is_winner = defaultdict(make_default_dict_false)
# date -> user str -> set[char]
self.letters_guessed = defaultdict(make_default_dict_set)
def fix_permissions(self):
"""
Discourage cheating
by making some files unreadable
and some unwritable.
"""
# 33152 = '-rw-------.'
# 33188 = '-rw-r--r--.'
os.chmod(__file__, 33152)
os.chmod("pubnix.py", 33188)
os.chmod("wordguess.py", 33188)
os.chmod("words.txt", 33188)
os.chmod("client.py", 33188)
Path(WordGuess.RESULTS_LOCATION).touch(33188)
Path(SAVE_LOCATION).touch(33152)
def game(self, connection, user):
"""
Start a game of Word Guess for
a given user with a specific connection.
"""
# As long as the connection is alive,
# treat the time the same
# NOTE: Timeout specified in pubnix.py
today = datetime.today().date()
wotd = self.get_wotd(today)
# Load from internal state, whether the user
# has already won or made guesses
gr = self.guesses_allowed - self.guesses_made[today][user]
is_winner = self.is_winner[today][user]
send_message(connection, WordGuess.GameStartMessage(is_winner, self.word_length, gr, list(self.letters_guessed[today][user])))
# No need to play if the user already won today.
if self.is_winner[today][user]:
return
while not self.is_winner[today][user] and self.guesses_made[today][user] < self.guesses_allowed:
message = receive_message(connection, WordGuess.GuessMessage)
message.word = message.word.lower()
# If the user made an invalid guess, don't
# provide a hint or count it against them.
if not self.valid_guess(message.word):
send_message(
connection,
WordGuess.GuessResponseMessage(
gr,
False,
False,
"",
list(self.letters_guessed[today][user])
)
)
continue
self.is_winner[today][user] = message.word == wotd
if not self.is_winner[today][user]:
self.guesses_made[today][user] += 1
gr = self.guesses_allowed - self.guesses_made[today][user]
hint = WordGuessServer.compare(wotd, message.word)
# Populate letters guessed
for c in message.word:
self.letters_guessed[today][user].add(c)
send_message(
connection,
WordGuess.GuessResponseMessage(
gr,
True,
self.is_winner[today][user],
hint,
list(self.letters_guessed[today][user])
)
)
if self.is_winner[today][user]:
WordGuessServer.save_record(today, user, gr)
@staticmethod
def save_record(date, username, score):
"""
Save score that user acheived into
results database for leaderboard viewing.
"""
con = sqlite3.connect(WordGuess.RESULTS_LOCATION)
try:
cur = con.cursor()
cur.execute(
"CREATE TABLE IF NOT EXISTS scores(user TEXT NOT NULL, score INT NOT NULL, date TIMESTAMP NOT NULL, PRIMARY KEY (user, date))"
)
cur.execute("INSERT INTO scores VALUES (?, ?, ?)", (username, score, date))
con.commit()
except sqlite3.IntegrityError:
print("Cannot write record:", (date, username, score))
finally:
con.close()
@lru_cache
def get_words(self):
words = []
with open("words.txt", "r") as file:
lines = file.read().splitlines()
words.extend([l for l in lines if len(l) == self.word_length])
return words
def get_wotd(self, day):
words = self.get_words()
index = (day.year * day.month * day.day * self.seed) % len(words)
return words[index]
def valid_guess(self, guess: str):
"""
Determine if a guess is valid,
as invalid guesses don't
get hints or impact guess counts.
"""
if len(guess) != self.word_length:
return False
# Guess needs to be part of our
# dictionary
if guess not in self.get_words():
return False
return True
@staticmethod
def compare(expected: str, guess: str) -> List[str]:
"""
Provide a hint to the user based on the word
guessed and the expected word of the day.
"""
output = ["_"] * len(expected)
counted_pos = set()
# Check for letters in correct positions
for i, (e_char, g_char) in enumerate(zip(expected, guess)):
if e_char == g_char:
output[i] = e_char
gchar_pos = char_positions(expected)
# Mark leters that are correct but are not in the
# specified location
for i, g_char in enumerate(guess):
if g_char in expected and output[i] in ["_", "*"]:
for pos in gchar_pos[g_char]:
if pos not in counted_pos:
output[i] = "*"
counted_pos.add(pos)
break
return output
@lru_cache
def char_positions(word: str):
@ -55,142 +219,17 @@ def make_default_dict_false():
def make_default_dict_set():
return defaultdict(set)
class WordGuessServer:
def __init__(self, seed, word_length = 5, guesses_allowed = 6):
self.seed = seed
self.word_length = word_length
self.guesses_allowed = guesses_allowed
# date -> user str -> int
self.guesses_made = defaultdict(make_default_dict_zero)
# date -> user str -> bool
self.is_winner = defaultdict(make_default_dict_false)
# date -> user str -> set[char]
self.letters_guessed = defaultdict(make_default_dict_set)
def fix_permissions(self):
# 33152 = '-rw-------.'
# 33188 = '-rw-r--r--.'
os.chmod(__file__, 33152)
os.chmod("pubnix.py", 33188)
os.chmod("wordguess.py", 33188)
os.chmod("words.txt", 33188)
os.chmod("client.py", 33188)
Path(WordGuess.RESULTS_LOCATION).touch(33188)
Path(SAVE_LOCATION).touch(33152)
@staticmethod
def save_record(date, username, score):
con = sqlite3.connect(WordGuess.RESULTS_LOCATION)
try:
cur = con.cursor()
cur.execute(
"CREATE TABLE IF NOT EXISTS scores(user TEXT NOT NULL, score INT NOT NULL, date TIMESTAMP NOT NULL, PRIMARY KEY (user, date))"
)
cur.execute("INSERT INTO scores VALUES (?, ?, ?)", (username, score, date))
con.commit()
except sqlite3.IntegrityError:
print("Cannot write record:", (date, username, score))
finally:
con.close()
@lru_cache
def get_words(self):
words = []
game_words = "words.txt"
# guess_words = "words.txt"
with open(game_words, "r") as file:
lines = file.read().splitlines()
words.extend([l for l in lines if len(l) == self.word_length])
return words
def get_wotd(self, day):
words = self.get_words()
index = (day.year * day.month * day.day * self.seed) % len(words)
return words[index]
def valid_guess(self, guess: str):
if len(guess) != self.word_length:
return False
# Guess needs to be part of our
# dictionary
if guess not in self.get_words():
return False
return True
@staticmethod
def compare(expected: str, guess: str) -> List[str]:
output = ["_"] * len(expected)
counted_pos = set()
# (1) Check for letters in correct positions
for i, (e_char, g_char) in enumerate(zip(expected, guess)):
if e_char == g_char:
output[i] = e_char
gchar_pos = char_positions(expected)
for i, g_char in enumerate(guess):
if g_char in expected and output[i] in ["_", "*"]:
for pos in gchar_pos[g_char]:
if pos not in counted_pos:
output[i] = "*"
counted_pos.add(pos)
break
return output
def game(self, connection, user):
# As long as the connection is alive,
# treat the time the same
# NOTE: Timeout does exist in pubnix.py
today = datetime.today().date()
wotd = self.get_wotd(today)
gr = self.guesses_allowed - self.guesses_made[today][user]
is_winner = self.is_winner[today][user]
send_message(connection, WordGuess.GameStartMessage(is_winner, self.word_length, gr, list(self.letters_guessed[today][user])))
if self.is_winner[today][user]:
return
while not self.is_winner[today][user] and self.guesses_made[today][user] < self.guesses_allowed:
message = receive_message(connection, WordGuess.GuessMessage)
message.word = message.word.lower()
if not self.valid_guess(message.word):
send_message(connection, WordGuess.GuessResponseMessage(gr))
continue
self.is_winner[today][user] = message.word == wotd
if not self.is_winner[today][user]:
self.guesses_made[today][user] += 1
gr = self.guesses_allowed - self.guesses_made[today][user]
result = WordGuessServer.compare(wotd, message.word)
# Populate letters guessed
for c in message.word:
self.letters_guessed[today][user].add(c)
send_message(
connection,
WordGuess.GuessResponseMessage(
gr,
True,
self.is_winner[today][user],
result,
list(self.letters_guessed[today][user])
)
)
if self.is_winner[today][user]:
WordGuessServer.save_record(today, user, gr)
SAVE_LOCATION = "state.pickle"
if __name__ == "__main__":
# NOTE: The seed must be kept secret otherwise
# players can cheat!
SEED = random.randint(3, 1000000)
print("Seed: ", SEED)
w = WordGuessServer(SEED)
# Load data structure if existent
# Load game state if existent
if os.path.exists(SAVE_LOCATION):
with open(SAVE_LOCATION, "rb") as file:
w = pickle.load(file)
@ -200,10 +239,11 @@ if __name__ == "__main__":
# to prevent cheating...
w.fix_permissions()
# Start game server
try:
run_simple_server(WordGuess.ADDRESS, w.game)
finally:
# Save game data structure
# After finishing, save game state
print("Saving game state... ", end="")
with open(SAVE_LOCATION, "wb") as file:
pickle.dump(w, file)

View file

@ -1,4 +1,5 @@
"""
WordGuess Library
Author: Brandon Rozek
Contains common data structures