Initial draft

This commit is contained in:
Brandon Rozek 2024-01-05 22:30:58 -05:00
commit 87986d4b2a
No known key found for this signature in database
GPG key ID: 26E457DA82C9F480
8 changed files with 6457 additions and 0 deletions

3
.gitignore vendored Normal file
View file

@ -0,0 +1,3 @@
state.pickle
results.db
__pycache__

83
client.py Normal file
View file

@ -0,0 +1,83 @@
"""
Author: Brandon Rozek
TODO: Argparse
"""
from datetime import datetime
from pubnix import (
run_simple_client,
send_message,
receive_message
)
from wordguess import WordGuess
WIN_TEXT = lambda score: f"""
Congratulations! You solved the word of the day.
Come back tomorrow! Your score: {score}
"""
GUESSES_REMAINING = lambda gr: f"You have {gr} guesses remaining."
LOSE_TEXT = """
You ran out of guesses for the day. Come back tomorrow!
"""
class WordGuessClient:
def __init__(self):
pass
def start_game(self, client, _):
message = receive_message(client, WordGuess.GameStartMessage)
guesses_remaining = message.guesses_remaining
is_winner = message.is_winner
today = datetime.today().date()
print(f"""
Welcome to WordGuess!
The goal is to guess the word of the day.
This word is {message.num_characters} characters long.
A * character means a letter was guessed correctly,
but in the incorrect position.
Today is {today}.
""")
if is_winner:
print(WIN_TEXT(guesses_remaining))
elif guesses_remaining > 0:
print(f"You have {guesses_remaining} guesses remaining")
print("\nTo quit, press CTRL-C.")
try:
while not is_winner and guesses_remaining > 0:
guess = input("Guess: ")
send_message(client, WordGuess.GuessMessage(guess))
message = receive_message(client, WordGuess.GuessResponseMessage)
if not message.valid:
print("Not a valid guess, try again.")
continue
guesses_remaining = message.guesses_remaining
is_winner = message.winner
print(message.hint)
print(GUESSES_REMAINING(guesses_remaining))
print("Letters Guessed:", sorted(message.letters_guessed))
if is_winner:
print(WIN_TEXT(guesses_remaining))
if not is_winner:
print(LOSE_TEXT)
except KeyboardInterrupt:
pass
if __name__ == "__main__":
w = WordGuessClient()
run_simple_client(WordGuess.ADDRESS, w.start_game)

97
game.py Normal file
View file

@ -0,0 +1,97 @@
from collections import defaultdict
from datetime import datetime
from typing import List
SEED = 838232
WORDS = []
GAME_WORDS = "gamewords.txt"
GUESS_WORDS = "guesswords.txt"
WORD_LENGTH = 5
with open("words.txt", "r") as file:
lines = file.read().splitlines()
WORDS.extend([l for l in lines if len(l) == WORD_LENGTH])
NUMBER_OF_GUESSES = 6
TODAY = datetime.today()
WORD_OF_THE_DAY = WORDS[(TODAY.year * TODAY.month * TODAY.day * SEED) % len(WORDS) ]
def valid_guess(guess: str):
if len(guess) != len(WORD_OF_THE_DAY):
return False
# Guess needs to be part of our
# dictionary
if guess not in WORDS:
return False
return True
def char_positions(word: str):
"""
Return a dictionary with positions
of each character within a word.
Ex: "hello" -> {"h": [0], "e": [1], "l": [2, 3], "o": [4]}
"""
result = defaultdict(list)
for i, char in enumerate(word):
result[char].append(i)
return result
CHAR_POSITIONS = char_positions(WORD_OF_THE_DAY)
def compare(expected: str, guess: str) -> List[str]:
output = ["_"] * len(expected)
counted_pos = set()
# (1) Check for letters in correct positions
for i, (e_char, g_char) in enumerate(zip(expected, guess)):
if e_char == g_char:
output[i] = e_char
for i, g_char in enumerate(guess):
if g_char in expected and output[i] in ["_", "-"]:
for pos in CHAR_POSITIONS[g_char]:
if pos not in counted_pos:
output[i] = "-"
counted_pos.add(pos)
break
pass
return output
if __name__ == "__main__":
num_guesses = 0
print("""
Guess words one at a time to guess the game word.
A - character means a letter was guessed correctly,
but in the incorrect position.
To quit, press CTRL-C.
""")
# start of the user name interaction
print("_ " * WORD_LENGTH)
while True:
guess = input("Guess: ").lower()
if not valid_guess(guess):
print("Not a valid guesss")
continue
num_guesses += 1
result = compare(WORD_OF_THE_DAY, guess)
print(" ".join(result))
if guess == WORD_OF_THE_DAY:
print("You won")
break
if num_guesses >= NUMBER_OF_GUESSES:
print("You lost")
break

24
leaderboard.py Normal file
View file

@ -0,0 +1,24 @@
"""
Author: Brandon Rozek
View leaderboard information for a particular date
# TODO: argparse
"""
from wordguess import WordGuess
import sqlite3
from datetime import datetime
DATE = str(datetime.today().date())
if __name__ == "__main__":
con = sqlite3.connect(WordGuess.RESULTS_LOCATION)
try:
cur = con.cursor()
res = cur.execute(f"SELECT user, score FROM scores WHERE date = '{DATE}' ORDER BY score DESC")
for username, score in res.fetchall():
print(username, score)
finally:
con.close()

250
pubnix.py Normal file
View file

@ -0,0 +1,250 @@
"""
Author: Brandon Rozek
TODO: Handle a user trying to connect multiple
times at the same time.
This might be handled automatically if only one
user can play at a time...
"""
import json
import pwd
import os
import sys
import socket
import binascii
from pathlib import Path
from typing import Union
from contextlib import contextmanager
from dataclasses import dataclass
# __all__ = ['send_message', 'MESSAGE_BUFFER_LEN', 'ChallengeMessage']
MESSAGE_BUFFER_LEN = 1024
TOKEN_LENGTH = 50
TIMEOUT = 5 * 60 # 5 minutes
@contextmanager
def start_client(address):
# Create the Unix socket client
client = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
# Connect to the server
try:
client.connect(address)
except FileNotFoundError:
print("Game server is not running at location", address)
sys.exit(1)
try:
yield client
finally:
client.close()
def run_simple_client(address, fn, force_auth=True):
with start_client(address) as client:
if force_auth:
user = login(client)
send_message(client, StartMessage())
fn(client, user)
@contextmanager
def client_connection(sock):
connection, _ = sock.accept()
try:
yield connection
except (
ProtocolException,
BrokenPipeError,
TimeoutError,
ConnectionResetError) as e:
# Ignore as client can reconnect
pass
finally: # clean up the connection
connection.close()
def run_simple_server(address, fn, force_auth=True):
with start_server(address) as sock:
print("Started server at", address)
try:
while True:
with client_connection(sock) as connection:
user = None
if force_auth:
user = authenticate(connection)
receive_message(connection, StartMessage)
fn(connection, user)
except KeyboardInterrupt:
print("Stopping server...")
def generate_token(length):
# From https://stackoverflow.com/a/41354711
return binascii.hexlify(os.urandom(length // 2)).decode()
def find_owner(path: Union[str, Path]) -> str:
return Path(path).owner()
def generate_challenge(user):
return ChallengeMessage(
username=user,
token=generate_token(TOKEN_LENGTH),
location=f"/home/{user}/.pubnix_challenge"
)
def authenticate(connection):
# First message should be an authentication message
message = receive_message(connection, AuthenticateMessage)
user = message.username
# Send challenge message
challenge = generate_challenge(user)
send_message(connection, challenge)
# Second message should be validation message
message = receive_message(connection, ValidationMessage)
# Check that challenge file exists
if not os.path.exists(challenge.location):
close_with_error(connection, "Challange file doesn't exist")
# Check if user owns the file
if find_owner(challenge.location) != user:
close_with_error(connection, "Challange file not owned by user")
# Make sure we can read the file
if not os.access(challenge.location, os.R_OK):
close_with_error(connection, "Challange file cannot be read by server")
# Check contents of challenge file
with open(challenge.location, "r") as file:
contents = file.read()
if contents != challenge.token:
close_with_error(connection, "Token within challange file is incorrect")
# Send authentication successful message
send_message(connection, AuthSuccessMessage())
return user
MESSAGE_BUFFER_LEN = 1024
def login(connection):
# Send authentication message
user = pwd.getpwuid(os.geteuid()).pw_name
message = AuthenticateMessage(username=user)
send_message(connection, message)
# Receive challenge message
challenge = receive_message(connection, ChallengeMessage)
# Write to challenge file
with open(challenge.location, "w") as file:
file.write(challenge.token)
# Tell server to check the challenge file
send_message(connection, ValidationMessage())
try:
message = receive_message(connection, AuthSuccessMessage)
finally:
# Delete challenge file
os.unlink(challenge.location)
return user
class MessageEncoder(json.JSONEncoder):
def default(self, o):
return o.__dict__
def send_message(connection, message):
contents = json.dumps(message, cls=MessageEncoder).encode()
connection.sendall(contents)
def receive_message(connection, cls=None):
message = connection.recv(MESSAGE_BUFFER_LEN).decode()
try:
message = json.loads(message)
except Exception:
print("Received:", message, flush=True)
close_with_error(connection, "Invalid Message Received")
if cls is not None:
try:
message = cls(**message)
except (TypeError, AssertionError):
close_with_error(connection, "Expected message of type")
return message
@contextmanager
def start_server(address):
if os.path.exists(address):
print("game.sock exists -- game server already running")
sys.exit(1)
# Create a unix domain socket
sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
sock.settimeout(TIMEOUT)
sock.bind(address)
sock.listen()
try:
yield sock
finally:
# Delete game.sock when finished
os.unlink(address)
class ProtocolException(Exception):
pass
def close_with_error(connection, content: str):
message = dict(type="error", message=content)
connection.sendall(json.dumps(message).encode())
raise ProtocolException()
@dataclass
class ChallengeMessage:
username: str
token: str
location: str
action: str = "challenge"
def __post_init__(self):
assert self.action == "challenge"
@dataclass
class AuthenticateMessage:
username: str
action: str = "authenticate"
def __post_init__(self):
assert self.action == "authenticate"
assert len(self.username) > 0
@dataclass
class ValidationMessage:
action: str = "validate"
def __post_init__(self):
assert self.action == "validate"
@dataclass
class AuthSuccessMessage:
type: str = "authentication_success"
def __post_init__(self):
assert self.type == "authentication_success"
@dataclass
class StartMessage:
action: str = "start"
def __post_init__(self):
assert self.action == "start"

210
server.py Normal file
View file

@ -0,0 +1,210 @@
"""
Author: Brandon Rozek
WordGuess game server
TODO: argparse
TODO: Multiple users trying to access it at same time?
TODO: Fix timeout issue
"""
from datetime import datetime
from functools import lru_cache
from pubnix import (
run_simple_server,
receive_message,
send_message
)
from collections import defaultdict
from wordguess import WordGuess
from typing import List
import os
import pickle
from pathlib import Path
import sqlite3
SEED = 838211
SAVE_LOCATION = "state.pickle"
@lru_cache
def char_positions(word: str):
"""
Return a dictionary with positions
of each character within a word.
Ex: "hello" -> {"h": [0], "e": [1], "l": [2, 3], "o": [4]}
"""
result = defaultdict(list)
for i, char in enumerate(word):
result[char].append(i)
return result
def make_zero():
return 0
def make_false():
return False
def make_default_dict_zero():
return defaultdict(make_zero)
def make_default_dict_false():
return defaultdict(make_false)
def make_default_dict_set():
return defaultdict(set)
class WordGuessServer:
def __init__(self, seed, word_length = 5, guesses_allowed = 6):
self.seed = seed
self.word_length = word_length
self.guesses_allowed = guesses_allowed
# date -> user str -> int
self.guesses_made = defaultdict(make_default_dict_zero)
# date -> user str -> bool
self.is_winner = defaultdict(make_default_dict_false)
# date -> user str -> set[char]
self.letters_guessed = defaultdict(make_default_dict_set)
def fix_permissions(self):
# 33152 = '-rw-------.'
# 33188 = '-rw-r--r--.'
os.chmod(__file__, 33152)
os.chmod("pubnix.py", 33188)
os.chmod("wordguess.py", 33188)
os.chmod("words.txt", 33188)
os.chmod("client.py", 33188)
Path(WordGuess.RESULTS_LOCATION).touch(33188)
Path(SAVE_LOCATION).touch(33152)
@staticmethod
def save_record(date, username, score):
con = sqlite3.connect(WordGuess.RESULTS_LOCATION)
try:
cur = con.cursor()
cur.execute(
"CREATE TABLE IF NOT EXISTS scores(user TEXT NOT NULL, score INT NOT NULL, date TIMESTAMP NOT NULL, PRIMARY KEY (user, date))"
)
cur.execute("INSERT INTO scores VALUES (?, ?, ?)", (username, score, date))
con.commit()
except sqlite3.IntegrityError:
print("Cannot write record:", (date, username, score))
finally:
con.close()
@lru_cache
def get_words(self):
words = []
game_words = "words.txt"
# guess_words = "words.txt"
with open(game_words, "r") as file:
lines = file.read().splitlines()
words.extend([l for l in lines if len(l) == self.word_length])
return words
def get_wotd(self, day):
words = self.get_words()
index = (day.year * day.month * day.day * self.seed) % len(words)
return words[index]
def valid_guess(self, guess: str):
if len(guess) != self.word_length:
return False
# Guess needs to be part of our
# dictionary
if guess not in self.get_words():
return False
return True
@staticmethod
def compare(expected: str, guess: str) -> List[str]:
output = ["_"] * len(expected)
counted_pos = set()
# (1) Check for letters in correct positions
for i, (e_char, g_char) in enumerate(zip(expected, guess)):
if e_char == g_char:
output[i] = e_char
gchar_pos = char_positions(expected)
for i, g_char in enumerate(guess):
if g_char in expected and output[i] in ["_", "*"]:
for pos in gchar_pos[g_char]:
if pos not in counted_pos:
output[i] = "*"
counted_pos.add(pos)
break
return output
def game(self, connection, user):
# As long as the connection is alive,
# treat the time the same
# NOTE: Timeout does exist in pubnix.py
today = datetime.today().date()
wotd = self.get_wotd(today)
gr = self.guesses_allowed - self.guesses_made[today][user]
is_winner = self.is_winner[today][user]
send_message(connection, WordGuess.GameStartMessage(is_winner, self.word_length, gr, list(self.letters_guessed[today][user])))
if self.is_winner[today][user]:
return
while not self.is_winner[today][user] and self.guesses_made[today][user] < self.guesses_allowed:
message = receive_message(connection, WordGuess.GuessMessage)
message.word = message.word.lower()
if not self.valid_guess(message.word):
send_message(connection, WordGuess.GuessResponseMessage(gr))
continue
self.is_winner[today][user] = message.word == wotd
if not self.is_winner[today][user]:
self.guesses_made[today][user] += 1
gr = self.guesses_allowed - self.guesses_made[today][user]
result = WordGuessServer.compare(wotd, message.word)
# Populate letters guessed
for c in message.word:
self.letters_guessed[today][user].add(c)
send_message(
connection,
WordGuess.GuessResponseMessage(
gr,
True,
self.is_winner[today][user],
result,
list(self.letters_guessed[today][user])
)
)
if self.is_winner[today][user]:
WordGuessServer.save_record(today, user, gr)
if __name__ == "__main__":
w = WordGuessServer(SEED)
# Load data structure if existent
if os.path.exists(SAVE_LOCATION):
with open(SAVE_LOCATION, "rb") as file:
w = pickle.load(file)
print("Successfully loaded game state")
# Make sure permissions are correct
# to prevent cheating...
w.fix_permissions()
try:
run_simple_server(WordGuess.ADDRESS, w.game)
finally:
# Save game data structure
print("Saving game state... ", end="")
with open(SAVE_LOCATION, "wb") as file:
pickle.dump(w, file)
print("Done.")

33
wordguess.py Normal file
View file

@ -0,0 +1,33 @@
"""
Author: Brandon Rozek
Contains common data structures
between WordGuess client and server
"""
from dataclasses import dataclass
class WordGuess:
RESULTS_LOCATION = "/home/rozek/repo/wordGuess/results.db"
ADDRESS = "/home/rozek/wordGuess/game.sock"
@dataclass
class GuessMessage:
word: str
action: str = "guess"
def __post_init__(self):
assert self.action == "guess"
@dataclass
class GuessResponseMessage:
guesses_remaining: int
valid: bool = False
winner: bool = False
hint: str = ""
letters_guessed: str = ""
@dataclass
class GameStartMessage:
is_winner: bool
num_characters: int
guesses_remaining: int
letters_guessed: str

5757
words.txt Normal file

File diff suppressed because it is too large Load diff