整理如下内容:
数据结构和算法
常用库和使用方法示例
Flask cheetsheet
数据结构和算法 一组序列,保存最后N个元素 使用deque,指定maxlen参数的值为N,例如:
1 2 3 4 5 6 7 8 9 >>> from collections import deque>>> dq = deque(maxlen=3 )>>> dq.append(1 )>>> dq.append(2 )>>> dq.append(3 )>>> dq.append(4 )>>> dq.append(5 )>>> print (dq)deque([3 , 4 , 5 ], maxlen=3 )
查找最大或最小的N个元素 除了直接排序,还可以利用heaq模块的nlargest()和nsmallest()方法,例如:
1 2 3 4 5 6 7 >>> nums = [3, 5, 2, 4, 1] >>> smallest = heapq.nsmallest(3, nums) >>> print(smallest) [1, 2, 3] >>> largest = heapq.nlargest(3, nums) >>> print(largest) [5, 4, 3]
常用库和使用方法示例 引用别的模块Logging logging_config.py
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 import loggingimport os log_dir = os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))), 'logs' ) os.makedirs(log_dir, exist_ok=True ) log_file = os.path.join(log_dir, 'app.log' ) logging.basicConfig( level=logging.DEBUG, format ='%(asctime)s - %(name)s - %(levelname)s - %(message)s' , handlers=[ logging.FileHandler(log_file), logging.StreamHandler() ] ) logger = logging.getLogger(__name__)
module_a.py
1 2 3 4 5 6 7 8 9 10 11 import logging_config logger = logging_config.logger def some_function (): logger.info("This is an info message from module_a." ) if __name__ == "__main__" : some_function()
module_b.py
1 2 3 4 5 6 7 8 9 10 11 import logging_config logger = logging_config.logger def another_function (): logger.error("This is an error message from module_b." ) if __name__ == "__main__" : another_function()
在这个例子中,logging_config.py负责配置日志,包括设置日志文件的路径。module_a.py和module_b.py都导入了logging_config模块,并使用其中配置好的logger对象来记录日志。这样,所有的日志都会写入到同一个日志文件中(在这个例子中是logs/app.log),并且日志格式和级别都是一致的。 请注意,这种方法假设你的项目结构允许logging_config.py中的__file__变量正确地定位到日志目录的上级目录。如果你的项目结构不同,你可能需要调整log_dir的计算方式。此外,如果你想要在每个模块中都有自己独立的日志配置(比如不同的日志文件路径或格式),你可能需要为每个模块分别配置日志。
用Queue实现生产者和消费者 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 import threading import queue import time import random # 创建一个队列对象 q = queue.Queue() # 生产者线程函数 def producer(q, count): for i in range(count): item = random.randint(1, 100) # 生产一个随机整数作为产品 q.put(item) # 将产品放入队列 print(f"Produced {item}") time.sleep(random.uniform(0.1, 0.5)) # 模拟生产时间 # 消费者线程函数 def consumer(q): while True: item = q.get() # 从队列中获取产品 if item is None: # 如果获取到的是 None,则停止消费 break print(f"Consumed {item}") time.sleep(random.uniform(0.1, 0.5)) # 模拟消费时间 q.task_done() # 通知队列任务已完成 # 创建并启动生产者线程 producer_thread = threading.Thread(target=producer, args=(q, 10)) producer_thread.start() # 创建并启动消费者线程 consumer_thread = threading.Thread(target=consumer, args=(q,)) consumer_thread.start() # 等待生产者线程完成 producer_thread.join() # 在生产者完成后,向队列发送一个 None 来通知消费者停止 q.put(None) # 等待消费者线程完成队列中的所有任务 q.join() # 注意:在实际应用中,消费者线程可能需要一种更优雅的方式来停止, # 而不是依赖于队列中的 None 值。这可以通过设置一个全局停止标志或使用其他线程同步机制来实现。
把list转换为字符串
判断变量是否为None 1 2 if x is not None: if not (x is None):
使用global修改全局变量 1 2 3 4 5 6 7 #!/usr/bin/env python3 x = 0 def inc(): global x x = x + 1 inc() print(x)
格式化打印字符串 1 'hello {language}' .format (language='Python' )
print打印不换行 1 2 print('a', end=' ') print('b', end=' ')
判断文件是否存在 1 2 3 abspath = '/etc/v1-config/xxx' if os.path.exists(abspath) == False : print ('file not exist' )
逐行读取文件内容,记录到数组, 再按原序去重 1 2 3 4 5 6 with open (domains_file, 'r' ) as f: content = f.readlines() for line in content: domain_list.append(line.strip('\n' )) directOutDomains = list (set (domain_list)) directOutDomains.sort(key=domain_list.index)
批量执行Shell语句 1 2 3 4 5 6 7 8 cmd = '''sed -i '$a\\acl direct-out-servers dstdomain {domain_list}' {cfg_path}; \ sed -i '$a\\cache_peer 127.0.0.1 parent 8081 0 no-query default login={u}:{p}' {cfg_path}; \ sed -i '$a\\cache_peer_access 127.0.0.1 allow !direct-out-servers' {cfg_path}; \ sed -i '$a\\cache_peer_access 127.0.0.1 deny all' {cfg_path}; \ sed -i '$a\\always_direct allow direct-out-servers' {cfg_path}; \ sed -i '$a\\never_direct allow !direct-out-servers' {cfg_path}; \ ''' .format (domain_list=domain_list_str, cfg_path=SQUID_CONF_PATH, u=generic_proxy.user, p=generic_proxy.passwd)os.system(cmd)
base64 decoding end encoding decoding
1 2 import base64proxy_user = base64.b64decode(proxy_user_base64 + '===' ).decode('ascii' , 'ignore' )
encoding
1 2 3 4 5 sample_string = "GeeksForGeeks is the best" sample_string_bytes = sample_string.encode("ascii" ) base64_bytes = base64.b64encode(sample_string_bytes) base64_string = base64_bytes.decode("ascii" )
给切片命名,使代码清晰可读 使用内置的slice函数创建切片,而不是硬编码下标,从而增强代码可读性,例如:
1 2 3 4 >>> ip="<127.0.0.1>" >>> GET_IP = slice(1,-1) >>> ip[GET_IP] '127.0.0.1'
解压序列,可迭代对象赋值给多个变量 1 2 3 >>> year, month, day = [2002, 6, 10] >>> print(year, month, day) 2002 6 10
可以用占位符,丢弃其他的值
1 2 3 year, _ , _ = [2002,6,10] print(year) 2002
解压可迭代对象赋值给多个变量 1 2 3 4 L = [1,2,3,4,5] first, *middle, last = grades print(middle) >>> [2,3,4]
Flask Cheetsheet Flask返回状态码 1 2 3 4 5 6 7 8 9 10 from flask import Flask, Response app = Flask(__name__) @app.route('/empty-200') def empty_200(): return Response(status=200) if __name__ == '__main__': app.run(debug=True)
Flask返回字符串+状态码 1 2 3 4 5 6 7 8 9 10 from flask import Flask app = Flask(__name__) @app.route('/return-string') def return_string(): return "Hello, this is a string!", 200 if __name__ == '__main__': app.run(debug=True)
Flask返回JSON+状态码 1 2 3 4 5 6 7 8 9 10 11 12 13 14 from flask import Flask, jsonify app = Flask(__name__) @app.route('/return-json') def return_json(): data = { "message": "Hello, this is JSON!", "status": "success" } return jsonify(data), 200 if __name__ == '__main__': app.run(debug=True)
Flask设置响应体, 响应头, 状态码: https://cloud.tencent.com/developer/article/1546924