1 Star 4 Fork 4

eehut/serial2tcp

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
克隆/下载
serial2tcp.py 6.53 KB
一键复制 编辑 原始数据 按行查看 历史
eehut 提交于 2022-11-17 20:58 . project init
#!/usr/bin/env python3
#coding=utf-8
import socket
import select
import serial
import sys
import os
import signal
import time
from timeit import default_timer as timer
class Led:
def __init__(self, _name=None):
self.name = _name
self.path = None
if self.name :
self.path = os.path.join("/sys/devices/platform/extgpio/gpios", _name)
if not os.path.exists(self.path):
print("led({name}) does not exist".format(name=_name))
self.path = None
self.state = 0
def set_value(self, value):
"""
写入一个值
"""
if self.path:
with open(self.path, "w") as fp:
fp.write(value)
def on(self):
self.state = 1
self.set_value("1")
def off(self):
self.state = 0
self.set_value("0")
def revert(self):
if self.state == 1:
self.off()
else:
self.on()
class Clients:
def __init__(self, _sock, _addr):
self.sock = _sock
self.addr = _addr
def info(self):
return "%s:%d" % (self.addr[0], self.addr[1])
def tcp_socket(addr, port):
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
try:
s.bind((addr, port))
except Exception as e:
s.close()
return None
s.listen(10)
return s
def open_serial(port, rate):
try:
s = serial.Serial(port, rate, timeout = 0)
return s
except Exception as e:
return None
running = True
fds = []
clients = []
signal_tx, signal_rx = socket.socketpair()
#查找客户端
def get_client(sock)->Clients:
for c in clients:
if c.sock == sock:
return c
return None
def close_client(sock):
#关闭
sock.close()
# 从客户端中移除
fds.remove(sock)
# 从监听列表中移除
for c in clients:
if c.sock is sock:
clients.remove(c)
def serial_to_tcp_server(serial_port, serial_rate, local_ip, local_port, led_name):
dev = open_serial(serial_port, serial_rate)
if dev == None:
print("open serial port failed, please check")
return
sock = tcp_socket(local_ip, local_port)
if sock == None:
print("open socket failed, please check")
return
print("open serial and socket success")
fds.append(sock)
fds.append(signal_rx)
# 打开一个运行灯
led = Led(led_name)
# 单位是秒
led_flash = timer() + 0.1
loop = True
while loop:
# 处理闪灯
now = timer()
if now > led_flash:
led_flash = now + 0.1
led.revert()
# 从串口接收数据
if dev.in_waiting > 0:
data = dev.read(dev.in_waiting)
#print("serail rx:", data)
for c in clients:
c.sock.send(data)
# 处理信号,连接建立及数据处理
r_fds, w_fds, e_fds = select.select(fds, [], [], 0.01)
for fd in r_fds:
if fd is signal_rx:
data = signal_rx.recv(128)
#print(data)
loop = False
elif fd is sock:
client_sock, client_addr = sock.accept()
fds.append(client_sock)
client = Clients(client_sock, client_addr)
clients.append(client)
print("connection(%s) setup, total:%d" % (client.info(), len(clients)))
else:
data = fd.recv(1024)
if not data:
client = get_client(fd)
if client:
print("connection(%s) lost" % client.info())
close_client(fd)
else:
#print("tcp rx:", data)
dev.write(data)
# exit
dev.close()
sock.close()
led.off()
## 信号处理
def signal_handle(signum:int, frame):
print("receive signal:", signum)
## 发送出去
signal_tx.send(("%d" % signum).encode())
def print_usage(name):
usage = \
"""
Usage:
{program} -h -- print help
{program} [-r <baudrate>] [-a <address>] [-p <port>] -l <led> <serial>
Parameters:
serial -- serial device name, "COM1" or "/dev/ttyS1"
baudrate -- baudrate for serial port(115200)
address -- binding local IP address(0.0.0.0)
port -- tcp server listening port(6390)
led -- specify led indication
"""
print(usage.format(program=name))
### 程序入口
def main_entry():
program = os.path.basename(sys.argv[0])
serial_port = "/dev/ttyS1"
serial_rate = 115200
local_ip = "0.0.0.0"
local_port = 6390
dev_led = None
args = sys.argv
n = len(args)
i = 1
while i < n:
if args[i] == '-r' and i < n:
a = args[i + 1]
if a[0] == '-': break
try:
serial_rate = int(args[i + 1])
except Exception as e:
print("***Invalid baudrate rate:", e)
sys.exit(1)
i += 1
elif args[i] == '-a' and i < n:
a = args[i + 1]
if a[0] == '-': break
local_ip = args[i + 1]
i += 1
elif args[i] == '-p' and i < n:
a = args[i + 1]
if a[0] == '-': break
try:
local_port = int(args[i + 1])
except Exception as e:
print("***Invalid local port:", e)
sys.exit(1)
i += 1
elif args[i] == '-l' and i < n:
a = args[i + 1]
if a[0] == '-': break
dev_led = args[i + 1]
i += 1
elif args[i] == '-h':
print_usage(program)
sys.exit(0)
else:
if args[i][0] == '-': break
serial_port = args[i]
i += 1
break
i += 1
print("Serial : {device}@{rate}".format(device = serial_port, rate = serial_rate))
print("TCP Server : {address}:{port}".format(address = local_ip, port = local_port))
signal.signal(signal.SIGINT, signal_handle)
signal.signal(signal.SIGTERM, signal_handle)
#signal.signal(signal.SIGTSTP, signal_handle)
serial_to_tcp_server(serial_port, serial_rate, local_ip, local_port, dev_led)
if __name__ == "__main__":
main_entry()
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
Python
1
https://gitee.com/eehut/serial2tcp.git
git@gitee.com:eehut/serial2tcp.git
eehut
serial2tcp
serial2tcp
master

搜索帮助