Upload files to 'CARDPUTER CircuitPython/projects/udp_socket_chat'

This commit is contained in:
2026-02-05 22:48:51 +00:00
parent 19040e1eeb
commit 9b2e6bbed5
2 changed files with 163 additions and 0 deletions

View File

@@ -0,0 +1,72 @@
import socket, sys, datetime, threading, json
class Client:
def __init__(self):
self.sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
print("Created socket object")
self.client_name = "CLI"
self.server_ip = "cubes.link"
self.server_port = 1337
self.server_address = (self.server_ip, self.server_port)
a = {"header":"connection",
"client name":self.client_name}
self.send(a, self.server_address)
def send(self, data, addr):
data = json.dumps(data).encode()
self.sock.sendto(data, addr)
def run(self):
while True:
data, addr = self.sock.recvfrom(65507)
a = json.loads(data)
header = a["header"]
if header == "response":
message = a["message"]
client_name = a["client name"]
print(client_name + " >> " + message)
elif header == "connection":
motd = a["motd"]
client_name = a["client name"]
print(">> " + client_name + " <<")
class Commands:
def __init__(self):
self.commands={"test":self.test,
"stop":self.stop}
self.running = True
def test(self, params):
print("this is a test command")
def stop(self, params):
print("this doesn't do anything rn")
def run(self):
while self.running:
text = input()
if text[0] == "/":
parts = text.split(" ")
command = parts[0].strip("/")
params = parts.pop(0)
if command in self.commands:
self.commands[command](params)
else:
a = {"header":"message",
"message":text,
"client name":client.client_name}
client.send(a, client.server_address)
client = Client()
commands = Commands()
client_thread = threading.Thread(target = client.run)
commands_thread = threading.Thread(target = commands.run)
client_thread.start()
commands_thread.start()

View File

@@ -0,0 +1,91 @@
import socket, sys, datetime, threading, json
class Server:
def __init__(self):
self.version = 1.0
print("Server version: {}".format(self.version))
self.motd = "cubey"
self.color = 0xe973ea
self.users = []
self.sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
print("Created socket object")
self.ip = "0.0.0.0"
self.port = 1337
self.address = (self.ip, self.port)
self.sock.bind(self.address)
print("Server started up on {}".format(self.address))
def send(self, data, client):
data = json.dumps(data).encode()
self.sock.sendto(data, client)
def send_all(self, data):
for user in self.users:
self.send(data, user)
def connection(self, address, connecting_client_name):
print("New connection from {}".format(address))
a = {"header":"connection",
"motd":self.motd,
"color":self.color,
"client name":connecting_client_name}
print(">> " + connecting_client_name + " <<")
self.users.append(address)
self.send_all(a)
def run(self):
while True:
try:
data, addr = self.sock.recvfrom(65507)
a = json.loads(data)
header = a["header"]
client_name = a["client name"]
if header == "connection":
self.connection(addr, client_name)
elif header == "message":
message = a["message"]
print(client_name + " >> " + message)
response = {"header":"response",
"message":message,
"color":0xffffff,
"client name":client_name}
if addr not in self.users:
self.users.append(addr)
self.send_all(response)
except OSError:
return
class Commands:
def __init__(self):
self.commands={"test":self.test,
"stop":self.stop}
self.running = True
def test(self):
print("this is a test command")
def stop(self):
print("Stopping server...")
server.sock.close()
server_thread.join()
self.running = False
def run(self):
while self.running:
command = input()
if command in self.commands:
self.commands[command]()
else:
print("Unknown command")
server = Server()
commands = Commands()
server_thread = threading.Thread(target = server.run)
commands_thread = threading.Thread(target = commands.run)
server_thread.start()
commands_thread.start()