Published on

python 服务

Authors
  • avatar
    Name
    MissTree
    Twitter

服务器

tcp服务

# 服务器端代码
from socket import socket,AF_INET,SOCK_STREAM
# AF_INET 用于 internet 进程的通信
# SOCK_STREAM tcp协议
# 创建socket对象
server = socket(AF_INET,SOCK_STREAM)
# 绑定IP地址和端口号
ip='127.0.0.1'
port=8090
server.bind((ip,port))
# 监听连接 设置最大连接数量为5
server.listen(5)
print('服务器启动')
# 接受客户端连接
client_socket, client_addr = server.accept()
# 接收客户端发送的数据
data = client_socket.recv(1024)
print('客户端发送过来的数据',data.decode('utf-8'))
# 建立多次通信
while data!='bye':
    # 接收的数据
    if data!='':
        print('接收到的数据是',data)
    # 发送数据
    info=input("请输入发送客户端数据")
    client_socket.send(info.encode('utf-8'))

    if data=='bye':
        break
    data = client_socket.recv(1024).decode('utf-8')

# 关闭连接
client_socket.close()
server.close()


# 客户端代码 在pycharm中创建新窗口运行新项目
from socket import socket,AF_INET,SOCK_STREAM
# AF_INET 用于 internet 进程的通信
# SOCK_STREAM tcp协议
# 客户端创建socket对象
client = socket()
# 绑定服务端IP地址和端口号
ip='127.0.0.1'
port=8090
client.connect((ip,port))
# 监听连接
print('------与服务器建立连接-----')
# 发送数据给服务器端
client.send('Hello, client!'.encode('utf-8'))
#多次通信
info=''
while info!='bye':#假设输入bye就是关闭通信
    # 准备发送的数据
    send_data=input('请输入对服务端的通信')
    client.send(send_data.encode('utf-8'))

    if info=='bye':
        break
    # 接收一条数据
    info=client.recv(1024).decode('utf-8')
    print('响应的数据:',info)
# 关闭连接
client.close()

建立连接前要求服务端先启动,否则会报错:ConnectionRefusedError: [WinError 10061] 由于目标计算机积极拒绝,无法连接。

udp服务

# 服务器端代码
from socket import socket,AF_INET,SOCK_DGRAM
# AF_INET 用于 internet 进程的通信
# SOCK_STREAM UDP协议
# 创建socket对象
send_server = socket(AF_INET,SOCK_DGRAM)
# 指定接收端IP地址和端口号
ip_port=('127.0.0.1',8090)
# 发送数据
data=input("发送数据")

# 监听连接
send_server.sendto(data.encode('utf-8'),ip_port)
print('服务器启动')
# 接受客户端信息
client_data, client_addr = send_server.recvfrom(1024)
# 接收客户端发送的数据
print('接收到的数据',client_data.decode('utf-8'))
# 关闭连接
send_server.close()


# 客户端代码 在pycharm中创建新窗口运行新项目
from socket import socket,AF_INET,SOCK_DGRAM
# AF_INET 用于 internet 进程的通信
# SOCK_STREAM tcp协议
# 客户端创建socket对象
client = socket(AF_INET,SOCK_DGRAM)
# 绑定服务端IP地址和端口号
ip='127.0.0.1'
port=8090
client.bind((ip,port))
# 接收数据
print('------等待接收数据-----')
rece_data,addr=client.recvfrom(1024)
print('接收到的数据:',rece_data.decode('utf-8'))
# 发送数据给服务器端
callbackmsg=input('请输入回复数据')
client.sendto(callbackmsg.encode('utf-8'),addr)

# 关闭连接
client.close()


进程

是程序在操作系统中的一次执行过程,是系统进行资源分配和调度的基本单位。每个进程都有自己独立的内存空间、系统资源(如文件描述符)等。

# 进程
from multiprocessing import Process

Process(group=None,target=None,name=None,args=(),kwargs={})

线程

每个进程都可能有多个线程,线程是进程中的一个执行单元,是 CPU 调度和分派的基本单位。因为线程共享进程的资源,线程间的通信更加方便,可以直接访问共享的内存变量,而且线程是无序的。但这也带来了一些问题,需要使用同步机制(如锁、信号量等)来保证数据的一致性。
线程问题:

  • 购买车票时,多个线程同时购买同一张票,因为判断还有票导致票被多个人抢
from threading import Thread
import time
import os
def func(name):
  for i in range(10):
    time.sleep(1)
    print(name,i,f'当前进程为{os.getpid()},父进程为{os.getppid()}')


if __name__ == '__main__':
  #创建线程
  t1 = Thread(target=func,args=('刘秀',))
  t2 = Thread(target=func,args=('赵匡胤',))
  t3 = Thread(target=func,args=('李世民',))

  t1.start()
  t2.start()
  t3.start()


class MyThread(Thread):
  def __init__(self,name):
     super(MyThread,self).__init__()
     self.name=name
  def run(self):
    for i in range(10):
      time.sleep(1)
      print(self.name,i,f'当前进程为{os.getpid()},父进程为{os.getppid()}')

if __name__ == '__main__':
  #创建线程
  t1 = MyThread('刘秀22')
  t2 = MyThread('赵匡胤33')
  t3 = MyThread('李世民444')

  t1.start()
  t2.start()
  t3.start()

# 线程锁
from multiprocessing import Pool
from threading import Thread,Lock,current_thread
import time,os,threading
# 售票数量
tickets=50
lock=Lock()
def sale_ticket():
    global tickets
    #每个窗口有100人排队
    for i in range(100):
      lock.acquire_lock()#上锁
      if tickets>0:
        print(f'{current_thread().name}正在售第{tickets}张票')
        tickets-=1
        time.sleep(0.5)
      lock.release_lock()#解锁

if __name__=='__main__':
    for i in range(3):
        t=Thread(target=sale_ticket)
        t.start()

线程池

方法名功能描述
apply_async(func,args,kwargs)使用非阻塞方式(异步方式)调用函数func
apply(func,args,kwargs)使用阻塞方式(同步方式)调用函数func
close()关闭进程池,不再接收新任务
terminate()不管任务是否完成,立即终止
join()阻塞主进程,必须在terminate()或close()之后使用
执行完线程池任务再执行后续代码
# 线程池
from multiprocessing import Pool
import time,os
def task(name):
  print(name,f'当前线程为{os.getpid()},父进程为{os.getppid()}')
  time.sleep(1)

if __name__ == '__main__':
  start=time.time()
  print("*"*20,start,"*"*20)
  # 创建线程池
  p=Pool(3)
  #创建线程
  for i in range(10):
      p.apply_async(func=task,args=(i,))
  # 关闭线程池,不再接收新任务
  p.close()
  # 阻塞主进程 等执行完计算时间
  p.join()
  print(time.time()-start)

# 线程池 支持遍历和线程回调
from  concurrent.futures import ThreadPoolExecutor
def func(name):
	return name

def call_back(value):
	print('*'*10,value.result())
		
if __name__ == '__main__':
	with ThreadPoolExecutor(10) as t:
		t.submit(func,'屌毛').add_done_callback(call_back)
		t.submit(func,'沙雕').add_done_callback(call_back)

  # 或者使用 map
 with ThreadPoolExecutor(10) as t:
   for i in t.map(func,['111','222','333']):
     print(i)

Pool和ThreadPoolExecutor、ProcessPoolExecutor区别:

  • Pool 是一个进程池,每个进程都有自己独立的内存空间,线程池是一个线程池,所有线程共享同一个内存空间。
    • 相对底层,使用时需要手动调用 close() 和 join() 等方法来确保资源的正确释放。
    • 各进程有独立的内存空间,能有效利用多核 CPU 的计算能力,适合处理 CPU 密集型任务。
    • 进程都有独立的内存空间,创建进程时会消耗较多的内存资源。
    • 进程间的数据共享较为复杂,需要借助 multiprocessing 模块提供的特定数据结构(像 Value、Array、Manager 等)来实现。
    • 场景:
      • 进程适合处理 CPU 密集型任务,如计算、图像处理等。
  • ThreadPoolExecutor 是一个阻塞式的线程池,线程池中的线程会一直等待任务的到来,直到线程池中的线程都被占用。
    • 支持 with 语句,自动关闭线程池,不需要手动调用 close() 和 join() 方法。
    • 过创建多个线程达成并发。线程共享进程的内存空间,开销相对较小,适合处理 I/O 密集型任务。
    • 线程共享进程的内存空间,创建线程的开销较小,内存使用相对较少。在处理大量小任务时,ThreadPoolExecutor 更为高效。
    • 线程共享进程的内存空间,数据共享相对容易,可直接访问和修改共享变量,但要注意线程安全问题,可使用 threading.Lock 等同步机制来保证数据的一致性。
    • 场景:
      • 适合处理大量小任务,每个任务的执行时间都比较短。
      • 适合处理 I/O 密集型任务,如网络请求、文件读写等。
  • ProcessPoolExecutor 是一个非阻塞式的进程池,进程池中的进程会一直等待任务的到来,直到进程池中的进程都被占用。
    • 基本和 ThreadPoolExecutor 使用类似。

共同点是线程池和进程池都可以用于并发执行任务,提高程序的执行效率。因为python是多线程语言,所以线程和进程执行完成的顺序是不一定的


进程

from multiprocessing import Process
import time,os
def func(name):
	for _ in range(10):
		print(f'{name}遍历次数:',_)
		time.sleep(1)
		
if __name__ == '__main__':
    p1=Process(target=func,args=('嬴政',))
    p2=Process(target=func,args=('毛主席',))

    p1.start()
    p2.start()

进程间的全局变量不能共享,导致多进程之间数据在执行后数据不正确


多任务异步协程

在执行异步任务的时候,一个任务执行的时间比较长,就会导致其他任务无法执行,为了解决这个问题,就可以使用协程来解决。原理大概是:在执行一个线程的时候,进入了任务等待时间,这个时候可以执行下一个或者多个线程任务,在线程任务响应时切换到对应的线程任务继续执行剩余的程序(最形象的就是工厂或者企业为了利益最大化,三班倒模式将时间利用到极致)。这样就大大提高了CPU的执行效率,协程的效率的大大高于线程池的。

#简单用法
import asyncio,time
# 执行的是一个协程对象
async def func1():
	print('我是一个异步函数1')
	await asyncio.sleep(1)
	print('我是异步函数1结束')
if __name__ == '__main__':
	"""协程对象要执行必须借助 event_loop"""
	event_loop=asyncio.get_event_loop()
	event_loop.run_until_complete(func())
	"""若是直接使用 asyncio.run(func()) 可能会报:
	Event Loop has closed!!  因为run在finnally时候会执行 loop.close()
	"""  

# 执行多个协程
async def func1():
	print('我是一个异步函数1')
	await asyncio.sleep(1)
	print('我是异步函数1结束')
	return '异步函数3结果'
async def func2():
	print('我是一个异步函数2')
	await asyncio.sleep(2)
	print('我是异步函数2结束')
	return '异步函数3结果'
async def func3():
	print('我是一个异步函数3')
	await asyncio.sleep(3)
	print('我是异步函数3结束')
	return '异步函数3结果'

async def main():
	start = time.time()
	# Create tasks from the coroutines
	task1 = asyncio.create_task(func1())
	task2 = asyncio.create_task(func2())
	task3 = asyncio.create_task(func3())
	
	# Wait for all tasks to complete
	# await asyncio.gather(task1, task2, task3)
	# Alternatively, you could use:
	done,pendding=await asyncio.wait([task1, task2, task3])
 lt = list(done)
	for _ in lt:
		print(_.result())
	"""
	asyncio.wait 返回结果是无序的
	result = await asyncio.gather(task1, task2, task3,return_exceptions=false)
	
	return_exceptions=true:如果有错误信息返回错误信息,其他任务正常执行
	return_exceptions=false:如果有错误信息返回错误信息,所有任务正常停止
	"""
	
	print(time.time() - start)
if __name__ == '__main__':
	"""协程对象要执行必须借助 event_loop"""
	# event_loop=asyncio.get_event_loop()
	# event_loop.run_until_complete(task)
	"""若是直接使用 asyncio.run(func()) 可能会报:
	Event Loop has closed!!  因为run在finnally时候会执行 loop.close()
	"""
	"""3.8版本前写法
	start=time.time()
	f1=func1()
	f2=func2()
	f3=func3()
	tasks=[f1,f2,f3]
	asyncio.run(asyncio.wait(tasks))
	print(time.time()-start)
	"""
	asyncio.run(main())

uvloop 是一个高性能的异步事件循环库,它是 Python 标准库 asyncio 的一个实现。uvloop 提供了比标准库 asyncio 更快的性能,特别是在处理大量并发连接时。接近于 GO 语言的协程性能。官网


进程和线程的使用

  • 线程:任务相对统一,互相特别相似。
  • 进程:多个任务相对独立,互相不相似没有交集。

队列

解决多进程之间数据共享问题

方法名称描述
qsize()消息数量获取当前队列包含的
empty()判断队列是否为空为空结果为True,否则为False
full()判断队列是否满了满结果为True,否则为False
get(block=True)获取队列中的一条消息,然后从队列中移除,block默认值为True
get_nowait()相当于get(block=False),消息队列为空时,抛出异常
put(item,block=True,time)将item消息放入队列,block默认为True
time默认一直等待入列,可以设置2(即2秒后不能入队报错)
put_nowait(item)相当于put(item,block=False),在队列满时直接报错
# 队列
from multiprocessing import Pool, Queue, Process, Value
import time,os
def add(q, shared_a):
  print(shared_a.value, f'add当前进程为{os.getpid()},父进程为{os.getppid()}')
  for i in range(6):
    with shared_a.get_lock():
        shared_a.value += 1
    q.put(shared_a.value)
    print(shared_a.value, 'add 入栈的值')

def reduce(q, shared_a):
  print(shared_a.value, f'reduce当前进程为{os.getpid()},父进程为{os.getppid()}')
  for i in range(6):
    with shared_a.get_lock():
        print(shared_a.value, 'reduce')
        shared_a.value -= 1
    q.put(shared_a.value)


if __name__ == '__main__':
  start = time.time()
  # 移除队列容量限制
  q = Queue()
  print("*" * 20, start, "*" * 20)
  # 使用 Value 实现多进程间变量共享
  shared_a = Value('i', 10)
  p1 = Process(target=add, args=(q, shared_a))
  p2 = Process(target=reduce, args=(q, shared_a))

  p1.start()
  p2.start()

  p1.join()
  p2.join()
  print(time.time() - start)

生产者消费者模型

多用于多线程或多进程中,通过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。

# 生产者消费者模型
from queue import Queue
from threading import Thread
import time
#创建一个生产类
class Producer(Thread):
  def __init__(self,name,queue):
    Thread.__init__(self,name=name)
    self.queue=queue
  def run(self):
    for _ in range(6):
      print(f'{self.name}将产品{_}放入队列')
      self.queue.put(_)
      time.sleep(1)
    print('生产者完成了数据的存放')

class Comsumer(Thread):
  def __init__(self,name,queue):
    Thread.__init__(self,name=name)
    self.queue=queue
  def run(self):
    for _ in range(5):
      value=self.queue.get()
      print(f'消费者线程{self.name}取出了{value}')
      time.sleep(1)
    print('-'*10+'消费完成了数据的读取'+'-'*10)

if __name__ == '__main__':
  q=Queue()
  p=Producer('Producer',q)
  c=Comsumer('Comsumer',q)

  p.start()
  c.start()

  p.join()
  c.join()
  print('主程序执行完成')


Selenium

Selenium是一个用于Web应用程序测试的工具。Selenium测试直接运行在浏览器中,就像真正的用户在操作一样。

aa

pymysql

pymysql是一个用于连接MySQL数据库的Python库。它提供了一个简单易用的API,用于执行SQL查询、插入、更新和删除操作。

import pymysql

# 创建连接
conn=pymysql.connect(
	user='xxxxx',
	password='xxxxxxx',
	host='host',
	database='xxxx',
	port=3306,
)
# 创建游标
cursor=conn.cursor()
metadata='{"ip": "192.168.1.1", "device": "iPhone"}'
sql='insert into comments (content, user_id, username, post_id, status)' \
	f"VALUES ('这是一条评论内容商城', 1, 'john_doe', 101, 'approved');"

try:
	result=cursor.execute(sql)
	print('-'*30)
	print(f'result',result)
	print('-'*30)
	conn.commit()
except e:
	conn.rollback()
finally:
	if cursor:
		cursor.close()
	conn.close()


pymongo

pymongo是一个用于连接MongoDB数据库的Python库。它提供了一个简单易用的API,用于执行MongoDB查询、插入、更新和删除操作。

from pymongo import MongoClient

def getdb ():
	client=MongoClient(
		host='localhost',
		port=27017
	)
	db = client['hello']
	return db

def add(db,table,data):
	if isinstance(data,dict):
		db[table].insert_one(data)
	else:
		db[table].insert_many(data)

def upd(db,table,condition,next):
	db[table].update_one(condition, {"$set":next})

def delt(db,table,condition):
	db[table].delete_many(condition)

if __name__ == '__main__':
	db=getdb()
	d={
		'file':'http://localhost:8000/0240902212049.jpg',
		'filename':'睁眼看世界',
		'user':'湾湾',
		'level':13
	}
	t=({
		'file':'http://localhost:8000/0240902212049.jpg',
		'filename':'睁眼看世界22',
		'user':'湾湾',
		'level':1
	},{
		
		'file': 'http://localhost:8000/0240902212049.jpg',
		'filename': '睁眼看世界11',
		'user': '湾湾',
		'level': 11
	})
	# add(db,'uploadInfo',d)
	# add(db,'uploadInfo',t)
	n={
		'file':'0240902212049.jpg',
		'filename':'睁1眼看世界',
		'user':'湾~湾',
		'level':10
	}
	
	# upd(db,'uploadInfo',{'filename':'测试'},n)
	delt(db,'uploadInfo',{'filename':None})