1 Star 0 Fork 1

红双的git / tcp_udp test

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
该仓库未声明开源许可证文件(LICENSE),使用请关注具体项目描述及其代码上游依赖。
克隆/下载
server.py 2.44 KB
一键复制 编辑 原始数据 按行查看 历史
红双的git 提交于 2016-01-20 17:35 . first commit
#!/usr/bin/env python
# -*- coding:utf-8 -*-
#
import socket
import threading
import SocketServer
import json, types,string
import os, time
import binascii
import getopt,sys
def usage():
print '''
server.py usage:
-s,--server:listen ip ,like 192.168.1.1
-p,--port :listen port,like 10000
-d,--data :ascii data
-h,--hexdata:hex data
'''
class ThreadedTCPRequestHandler(SocketServer.BaseRequestHandler):
def handle(self):
data = self.request.recv(1024)
print "Tcp receive: %s from %s" %(data,self.client_address)
print "Tcp receive[hex]: [%s] from %s" %(binascii.b2a_hex(data),self.client_address)
self.request.sendall(data)
class ThreadedTCPServer(SocketServer.ThreadingMixIn, SocketServer.TCPServer):
pass
class MyUDPHandler(SocketServer.BaseRequestHandler):
def handle(self):
data = self.request[0].strip()
addr=self.client_address
socket=self.request[1]
print "UDP receive:%s from %s"%(data,addr)
print "UDP receive[hex]:[%s] from %s"%(binascii.b2a_hex(data),addr)
socket.sendto(data.upper(),self.client_address)
def main(argv):
shortargs='s:p:d:h'
longargs=['server','port','data','hexdata']
server=None
port=None
data=None
try:
opts,args=getopt.getopt(argv[1:],shortargs,longargs)
for opt,val in opts:
if opt=='-s' or opt=='--server':
server=val
elif opt=='-p' or opt=='--port':
port=int(val)
elif opt=='-d' or opt=='--data':
data=val
elif opt=='-h' or opt=='--hexdata':
data=binascii.a2b_hex(val)
# Port 0 means to select an arbitrary unused port
except Exception,e:
usage()
print e
HOST, PORT = server,port
if HOST==None or PORT==None:
usage()
sys.exit(2)
SocketServer.TCPServer.allow_reuse_address = True
server = ThreadedTCPServer((HOST, PORT), ThreadedTCPRequestHandler)
ip, port = server.server_address
serverudp = SocketServer.UDPServer((HOST, PORT), MyUDPHandler)
serverudp_thread = threading.Thread(target=serverudp.serve_forever)
serverudp_thread.daemon = True
serverudp_thread.start()
server_thread = threading.Thread(target=server.serve_forever)
server_thread.daemon = True
server_thread.start()
print "Listen on %s:%s" %(HOST,PORT)
print "Server loop running in thread:", server_thread.name
print "Server loop running in thread:", serverudp_thread.name
print " .... waiting for connection"
serverudp.serve_forever()
if __name__ == "__main__":
main(sys.argv)
Python
1
https://gitee.com/zhanghs/tcp_udp-test.git
git@gitee.com:zhanghs/tcp_udp-test.git
zhanghs
tcp_udp-test
tcp_udp test
master

搜索帮助