代码拉取完成,页面将自动刷新
import pymysql
import csv
# Configuration for data source
DATA_SOURCE = "csv" # Can be "database" or "csv"
def create_connection():
"""Create a database connection and return the connection object if using a database."""
if DATA_SOURCE == "database":
host_name = 'gz-cdb-d5jb7avj.sql.tencentcdb.com'
port_number = 29776
user_name = 'root'
password = 'a12345678'
db_name = 'load'
try:
connection = pymysql.connect(
host=host_name,
user=user_name,
password=password,
database=db_name,
port=port_number,
cursorclass=pymysql.cursors.DictCursor # Using dictionary cursor for easier handling
)
print("Connection to MySQL DB successful")
return connection
except pymysql.MySQLError as e:
print(f"The error '{e}' occurred")
return None
else:
return None # No connection needed for CSV
def execute_read_query(connection, query, params=None):
"""Execute a read query and return the results if using a database."""
if DATA_SOURCE == "database":
with connection.cursor() as cursor:
try:
cursor.execute(query, params)
return cursor.fetchall() # Fetch all results
except pymysql.MySQLError as e:
print(f"The error '{e}' occurred")
return []
else:
return [] # Return empty list for CSV
def read_csv(file_path):
"""Read data from a CSV file and return as list of dictionaries."""
data = []
with open(file_path, mode='r', encoding='utf-8') as file:
reader = csv.DictReader(file)
for row in reader:
data.append(row)
return data
def get_trucks(connection):
"""Fetch all trucks from the specified data source."""
if DATA_SOURCE == "database":
query = "SELECT * FROM trucks"
return execute_read_query(connection, query)
else:
return read_csv("../database/trucks.csv")
def get_packages(connection):
"""Fetch all packages from the specified data source."""
if DATA_SOURCE == "database":
query = "SELECT * FROM packages WHERE status = 0"
return execute_read_query(connection, query)
else:
return read_csv_filtered_by_status("../database/packages.csv", 0)
def read_csv_filtered_by_status(file_path, status):
"""Read CSV file and filter by status."""
filtered_packages = []
with open(file_path, mode='r', newline='') as file:
reader = csv.DictReader(file)
for row in reader:
if int(row['status']) == status:
filtered_packages.append(row)
return filtered_packages
def create_truck_loads_table(connection):
"""Create the truck_loads table if it does not exist and using a database."""
if DATA_SOURCE == "database":
create_table_query = """
CREATE TABLE IF NOT EXISTS truck_loads (
truck_id INT PRIMARY KEY,
max_capacity INT,
destinations JSON,
packages JSON,
total_weight INT,
num_packages INT
);
"""
with connection.cursor() as cursor:
cursor.execute(create_table_query)
connection.commit()
def get_truck_load(connection, truck_id):
"""Fetch truck load information by truck ID based on the data source."""
if DATA_SOURCE == "database":
query = "SELECT * FROM truck_loads WHERE truck_id = %s"
return execute_read_query(connection, query, (truck_id,))
else:
all_trucks = read_csv("../database/truck_loads.csv")
return [truck for truck in all_trucks if truck['truck_id'] == str(truck_id)]
def get_destinations(connection, destination_ids):
"""Fetch destination data by IDs based on the data source."""
if DATA_SOURCE == "database":
placeholders = ', '.join(['%s'] * len(destination_ids))
query = f"SELECT * FROM destinations WHERE destination_id IN ({placeholders})"
return execute_read_query(connection, query, destination_ids)
else:
all_destinations = read_csv("../database/destinations.csv")
return [destination for destination in all_destinations if destination['destination_id'] in map(str, destination_ids)]
def update_trucks(connection, data_frame):
"""Update trucks in the database with new data."""
with connection.cursor() as cursor:
# Clear the existing data
cursor.execute("DELETE FROM trucks")
# Prepare the insert query
insert_query = """
INSERT INTO trucks (id, plate_number, max_capacity)
VALUES (%s, %s, %s);
"""
# Iterate through DataFrame rows
for index, truck in data_frame.iterrows():
cursor.execute(insert_query, (truck['id'], truck['plate_number'], truck['max_capacity']))
connection.commit()
print("update_trucks ---- successed")
def update_packages(connection, data_frame):
"""Update packages in the database with new data."""
with connection.cursor() as cursor:
# Clear the existing data
cursor.execute("DELETE FROM packages")
# Prepare the insert query
insert_query = """
INSERT INTO packages (id, volume, level, destination_id, status)
VALUES (%s, %s, %s, %s, %s);
"""
# Iterate through DataFrame rows
for index, package in data_frame.iterrows():
cursor.execute(insert_query, (
package['id'],
package['volume'],
package['level'],
package['destination_id'],
package['status'],
))
connection.commit()
print("update_packages ---- successed")
def update_package_status(connection, processed_cargo_ids):
if processed_cargo_ids:
with connection.cursor() as cursor:
# 构造SQL查询来一次性更新所有被处理的包裹状态
query = "UPDATE packages SET status = 1 WHERE id IN (%s)" % ','.join(['%s'] * len(processed_cargo_ids))
cursor.execute(query, list(processed_cargo_ids))
connection.commit()
# 更新 destinations
def update_destinations(connection, data_frame):
"""Update destinations in the database with new data."""
with connection.cursor() as cursor:
# Optionally clear the existing data
cursor.execute("DELETE FROM destinations")
# Prepare the insert query
insert_query = """
INSERT INTO destinations (destination_id, community_name, address, x, y)
VALUES (%s, %s, %s, %s, %s);
"""
# Iterate through DataFrame rows
for index, destination in data_frame.iterrows():
cursor.execute(insert_query, (
destination['destination_id'],
destination['community_name'],
destination['address'],
destination['x'],
destination['y']
))
connection.commit()
print("update_destinations ---- successed")
# Usage example:
if __name__ == "__main__":
conn = create_connection()
trucks = get_trucks(conn)
print(trucks)
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。