Add 'CARDPUTER CircuitPython/projects/udp_socket_chat/code.py'

This commit is contained in:
2026-02-05 22:41:27 +00:00
parent 3f6ff246fb
commit bf4780dcea

View File

@@ -0,0 +1,228 @@
from keyb import Keyboard # type: ignore
import json, wifi, os, socketpool, asyncio, sys, select # type: ignore
import board, busio, terminalio, displayio # type: ignore
from adafruit_display_text import label # type: ignore
from adafruit_display_shapes.rect import Rect # type: ignore
async def main():
keyb_task = asyncio.create_task(keyb_scan())
socket_task = asyncio.create_task(socket_recv())
await asyncio.gather(keyb_task, socket_task)
async def keyb_scan():
while True:
if select.select([sys.stdin,],[],[],0.1)[0]:
code=hex(ord(sys.stdin.readline(1)))
key = keyb.scan(keycode = code)
if keyb.is_alphanumeric(key):
if len(textbar.text) < 31:
textbar.text += str(key)
elif key == "\n":
a = {"header":"message",
"message":textbar.text,
"client name":client_name}
data = json.dumps(a)
sock.sendto(data, address)
textbar.text = ""
elif key == "BACKSP":
textbar.text = textbar.text[:-1]
elif key == " ":
if len(textbar.text) < 31:
textbar.text += " "
else:
await asyncio.sleep(0.1)
async def socket_recv():
while True:
try:
buffer = bytearray(200)
size, addr = sock.recvfrom_into(buffer)
try:
a = json.loads(buffer)
header = a["header"]
if header == "response":
msg = a["message"]
try:
color = a["color"]
except KeyError:
color = 0xffffff
try:
recv_client_name = a["client name"]
except KeyError:
recv_client_name = "(unknown)"
cmsg = recv_client_name + " >> " + msg
messages_list.append([cmsg, color])
update_gui()
elif header == "connection":
motd = a["motd"]
conn_client = a["client name"]
try:
color = a["color"]
except KeyError:
color = 0xffffff
con_msg = ">> " + conn_client + " <<"
messages_list.append([con_msg, color])
update_gui()
except Exception as e:
error_msg = str(e) + " (buffer size?)"
color = 0xff0000
messages_list.append([error_msg, color])
update_gui()
except OSError:
pass
await asyncio.sleep(0.1)
def update_gui():
x = 0
for message in messages_list[-max_messages:]:
messages_list_labels[x].text = message[0]
messages_list_labels[x].color = message[1]
x += 1
keyb = Keyboard()
# SET UP DISPLAY & WIFI
display = board.DISPLAY
display_group = displayio.Group()
display.root_group = display_group
wifi_options = [[os.getenv("WIFI1"),os.getenv("WIFIPW1")],
[os.getenv("WIFI2"),os.getenv("WIFIPW2")],
[os.getenv("WIFI3"),os.getenv("WIFIPW3")]]
h1 = label.Label(terminalio.FONT, text="Hello!", color=0xffffff, scale=2)
h1.x = 10
h1.y = 15
count = 0
menu_labs = []
for option in wifi_options:
if count == 0:
menu_lab = label.Label(terminalio.FONT, text = "> " + option[0], color=0xffffff, scale=1)
else:
menu_lab = label.Label(terminalio.FONT, text = option[0], color=0xffffff, scale=1)
menu_lab.x = 10
menu_lab.y = 35 + count * 10
display.root_group.append(menu_lab)
menu_labs.append(menu_lab)
count+=1
display.root_group.append(h1)
pos = 0
while True:
key = keyb.scan()
if key == "UP":
if pos == 0:
pass
else:
menu_labs[pos].text = wifi_options[pos][0]
pos -= 1
menu_labs[pos].text = "> " + wifi_options[pos][0]
elif key == "DOWN":
if pos == len(wifi_options)-1:
pass
else:
menu_labs[pos].text = wifi_options[pos][0]
pos += 1
menu_labs[pos].text = "> " + wifi_options[pos][0]
elif key == "\n":
WIFI_SSID = wifi_options[pos][0]
WIFI_PASS = wifi_options[pos][1]
break
try:
for lab in menu_labs:
display.root_group.remove(lab)
txt = label.Label(terminalio.FONT, text = "", color=0x00ff00, scale=1)
txt.text = "Connecting to " + WIFI_SSID + "..."
txt.x = 10
txt.y = 35
display.root_group.append(txt)
wifi.radio.connect(ssid=WIFI_SSID,
password=WIFI_PASS)
my_ip = str(wifi.radio.ipv4_address).strip()
display.root_group.remove(h1)
display.root_group.remove(txt)
except ConnectionError as e:
h1.text = "Aw..."
txt.text = "Could not connect to " + WIFI_SSID
txt.color = 0xff0000
txt.x = 10
txt.y = 35
txt2 = label.Label(terminalio.FONT, text = "", color=0x89a0a8, scale=1)
txt2.text = "Press CTRL + C followed by CTRL + D\nto soft-reboot"
txt2.x = 10
txt2.y = 100
display.root_group.append(txt2)
while True:
pass
# Header Bar
header_bar = Rect(0, 0, 238, 30, fill=0x1f4476)
display.root_group.append(header_bar)
# Page Title
title_label = label.Label(terminalio.FONT, text="Chat", color=0xffffff, scale=2)
display.root_group.append(title_label)
title_label.x = 10
title_label.y = 15
# IP Label
ip_label = label.Label(terminalio.FONT, text=my_ip, color=0x89a0a8, scale=1)
display.root_group.append(ip_label)
ip_label.x = 150
ip_label.y = 20
pool = socketpool.SocketPool(wifi.radio)
sock = pool.socket(pool.AF_INET, pool.SOCK_DGRAM)
sock.settimeout(3)
sock.setblocking(False)
ip = "cubes.link"
port = 1337
address = (ip, port)
client_name = "CARDPUTER"
a = {"header":"connection",
"client name":client_name}
data = json.dumps(a)
sock.sendto(data, address)
textbar = label.Label(terminalio.FONT, text = "", color = 0x89a0a8, scale=1)
display.root_group.append(textbar)
textbar.x = 10
textbar.y = 120
messages_list = []
messages_list_labels = []
max_messages = 8
for x in range(0, max_messages):
mlab = label.Label(terminalio.FONT, text="", color=0xffffff, scale=1)
mlab.x = 8
mlab.y = 38 + (x * 10)
display.root_group.append(mlab)
messages_list_labels.append(mlab)
asyncio.run(main())