diff --git a/2023-11-21_oil_price.html b/2023-11-21_oil_price.html new file mode 100644 index 0000000000000000000000000000000000000000..e69de29bb2d1d6434b8b29ae775ad8c2e48c5391 diff --git a/modules/_crawler.py b/modules/_crawler.py index 1d065132c325594d49ddcc2f041930559926eb80..54ef678bf032e8b68a394e7ff66ebc70d7342151 100644 --- a/modules/_crawler.py +++ b/modules/_crawler.py @@ -4,245 +4,259 @@ from config import Config from ._datapackage import * from abc import ABC, abstractmethod import pandas as pd -import requests +import requests, json, os, time +from time import sleep from loguru import logger -import json -import time +from lxml import etree - -''' +""" 爬虫类 -''' +""" class CrawlerInterface(DataPackageHandler): def __init__(self): super().__init__() - self._callback_func_dict: dict[str: callable] - self._input_columns_type_dict: list[dict[str: list[callable]]] = [Config.DataPackageConfig.NOT_INIT_COLUMNS, Config.CrawlerConfig.RESPONSE_COLUMNS] - self._output_columns_type_dict: list[dict[str: list[callable]]] = [Config.CrawlerConfig.RESPONSE_COLUMNS] + self._callback_func_dict: dict[str:callable] + self._input_columns_type_dict: list[dict[str : list[callable]]] = [ + Config.DataPackageConfig.NOT_INIT_COLUMNS, + Config.CrawlerConfig.RESPONSE_COLUMNS, + ] + self._output_columns_type_dict: list[dict[str : list[callable]]] = [ + Config.CrawlerConfig.RESPONSE_COLUMNS + ] self.failed_tag = Config.DataPackageConfig.CHILD_NOT_INIT_TAG self.try_again_seconds: int - - - @ abstractmethod - def request_and_process_data(self)-> pd.DataFrame|None: - ''' + + @abstractmethod + def request_and_process_data(self) -> pd.DataFrame | None: + """ 子类必须重写的抽象方法, 爬取数据, 处理成给定字段的pd.DataFrame, 如果爬取失败,返回None - ''' + """ pass - - - - def _run(self, datapackage: DataPackage)-> None: - ''' + + def _run(self, datapackage: DataPackage) -> None: + """ 子类无需关心的方法, 用于运行爬虫和进行错误处理 - ''' + """ result = self.request_and_process_data() - if result is None: # 返回一个请求失败的包 datapackage.tag_stack.append(self.failed_tag) - datapackage.df = pd.DataFrame({"爬取时间":['ERROR'], "响应文本":["ERROR"]}) - datapackage_str = str(datapackage).replace('\n', '\n\t') - logger.error(f"<处理器: {type(self).__name__} 环节: 无响应或响应不合法>:\n\t重新请求时间: {self.try_again_seconds}秒之后\n\t数据包细节:\n\t{datapackage_str}") + datapackage.df = pd.DataFrame({"爬取时间": ["ERROR"], "响应文本": ["ERROR"]}) + datapackage_str = str(datapackage).replace("\n", "\n\t") + logger.error( + f"<处理器: {type(self).__name__} 环节: 无响应或响应不合法>:\n\t重新请求时间: {self.try_again_seconds}秒之后\n\t数据包细节:\n\t{datapackage_str}" + ) return False - datapackage.df = result - datapackage_str = str(datapackage).replace('\n', '\n\t') - logger.success(f"<处理器: {type(self).__name__} 环节: 得到成功的响应>:\n\t数据包细节:\n\t{datapackage_str}") + datapackage_str = str(datapackage).replace("\n", "\n\t") + logger.success( + f"<处理器: {type(self).__name__} 环节: 得到成功的响应>:\n\t数据包细节:\n\t{datapackage_str}" + ) return True - - - def _rerun(self, datapackage: DataPackage)-> None: - if check_time_exceeded(datapackage.init_time, '%Y-%m-%d %H:%M:%S.%f', self.try_again_seconds): + + def _rerun(self, datapackage: DataPackage) -> None: + if check_time_exceeded( + datapackage.init_time, "%Y-%m-%d %H:%M:%S.%f", self.try_again_seconds + ): is_success = self._run(datapackage) if not is_success: # 如果二次请求还是失败,那么直接让回收器销毁数据包吧 - datapackage_str = str(datapackage).replace('\n', '\n\t') - logger.critical(f"<处理器: {type(self).__name__} 环节: 二次请求失败,停止本轮爬取任务>:\n\t数据包细节:\n\t{datapackage_str}") + datapackage_str = str(datapackage).replace("\n", "\n\t") + logger.critical( + f"<处理器: {type(self).__name__} 环节: 二次请求失败,停止本轮爬取任务>:\n\t数据包细节:\n\t{datapackage_str}" + ) datapackage.tag_stack = [] - return - # 如果没有到时间, 把处理前弹出的failed_tag补一个新的回去 datapackage.tag_stack.append(self.failed_tag) - - - -@ singleton + + +@singleton class VegetableCrawler(CrawlerInterface): def __init__(self): super().__init__() self._callback_func_dict = { Config.DataPackageConfig.VEGETABLE_PRICE_RESPONSE_GET_TAG: self._run, Config.DataPackageConfig.VEGETABLE_PRICE_CRAWL_FAILED_TAG: self._rerun, - } + } self.failed_tag = Config.DataPackageConfig.VEGETABLE_PRICE_CRAWL_FAILED_TAG self.url = Config.CrawlerConfig.VegetableCrawlerConfig.URL - self.try_again_seconds = Config.CrawlerConfig.VegetableCrawlerConfig.TRY_AGAIN_SECONDS + self.try_again_seconds = ( + Config.CrawlerConfig.VegetableCrawlerConfig.TRY_AGAIN_SECONDS + ) - def request_and_process_data(self): # 这是我实现的爬虫类, 可以参考它来编写天气和经济的爬虫 try: - response = requests.post(url = self.url, timeout=30) + response = requests.post(url=self.url, timeout=30) except: return None - text = response.text statue_code = response.status_code if statue_code != 200: return None - - result_df = pd.DataFrame({'爬取时间':[Config.CrawlerConfig.NOW_TIME_FUNC()], '响应文本':[text]}) + result_df = pd.DataFrame( + {"爬取时间": [Config.CrawlerConfig.NOW_TIME_FUNC()], "响应文本": [text]} + ) return result_df - -@ singleton +@singleton class DailyWeatherCrawler(CrawlerInterface): def __init__(self): super().__init__() self._callback_func_dict = { Config.DataPackageConfig.DAILY_WEATHER_RESPONSE_GET_TAG: self._run, Config.DataPackageConfig.DAILY_WEATHER_CRAWL_FAILED_TAG: self._rerun, - } + } self.failed_tag = Config.DataPackageConfig.DAILY_WEATHER_CRAWL_FAILED_TAG - - self.url = Config.CrawlerConfig.DailyWeatherCrawlerConfig.URL # 通过这里来了解如何从配置文件获取常量, 当然你可以在自己的配置类里增加任何内容 + self.url = ( + Config.CrawlerConfig.DailyWeatherCrawlerConfig.URL + ) # 通过这里来了解如何从配置文件获取常量, 当然你可以在自己的配置类里增加任何内容 # self.try_again_seconds = ??? # 请指定请求失败后的重新请求时间, 该属性必须填写 self.cityids = Config.CrawlerConfig.DailyWeatherCrawlerConfig.cityids self.headers = Config.CrawlerConfig.DailyWeatherCrawlerConfig.headers self.year = Config.CrawlerConfig.DailyWeatherCrawlerConfig.current_year self.month = Config.CrawlerConfig.DailyWeatherCrawlerConfig.current_month - + def request_and_process_data(self): - text = "" for cityid in self.cityids: - params = { - "areaInfo[areaId]": cityid, - "areaInfo[areaType]": 2, - "date[year]": self.year, - "date[month]": self.month + "areaInfo[areaId]": cityid, + "areaInfo[areaType]": 2, + "date[year]": self.year, + "date[month]": self.month, } - try: - response = requests.get(url = self.url, headers=self.headers, params=params) + response = requests.get( + url=self.url, headers=self.headers, params=params + ) text += response.text + "$$$" - except: return None - # 如果请求数据成功, 那么返回给定字段的pd.DataFrame,值得注意的是,'爬取时间'的值是固定的,也就是使用统一的Config.CrawlerConfig.NOW_TIME_FUNC函数 # '响应文本'就是获取到的源数据,它虽然需要被存储到数据库, 但对字符串本身的格式没有要求, 因为将来你还需自定义相应的数据清洗器来将它转变成规定的格式 # 如果你对需要返回的格式仍旧有疑问, 那么请转到父类CrawlerInterface的_output_columns_type_dict属性, 它规定了从这儿走出去的数据包的表格的字段需要是什么样子的 - result_df = pd.DataFrame({'爬取时间':[Config.CrawlerConfig.NOW_TIME_FUNC()], '响应文本':[text]}) - + result_df = pd.DataFrame( + {"爬取时间": [Config.CrawlerConfig.NOW_TIME_FUNC()], "响应文本": [text]} + ) + # 最后如果请求成功,则返回这个表 return result_df - - -@ singleton + + +@singleton class ExtremeWeatherCrawler(CrawlerInterface): def __init__(self): super().__init__() self._callback_func_dict = { Config.DataPackageConfig.EXTREME_WEATHER_RESPONSE_GET_TAG: self._run, Config.DataPackageConfig.EXTREME_WEATHER_CRAWL_FAILED_TAG: self._rerun, - } + } self.failed_tag = Config.DataPackageConfig.EXTREME_WEATHER_CRAWL_FAILED_TAG - - self.url = Config.CrawlerConfig.ExtremeWeatherCrawlerConfig.URL # 通过这里来了解如何从配置文件获取常量, 当然你可以在自己的配置类里增加任何内容 + + self.url = ( + Config.CrawlerConfig.ExtremeWeatherCrawlerConfig.URL + ) # 通过这里来了解如何从配置文件获取常量, 当然你可以在自己的配置类里增加任何内容 # self.try_again_seconds = ??? # 请指定请求失败后的重新请求时间, 该属性必须填写 - def request_and_process_data(self): - is_request_success = False # 请求是否成功? + is_request_success = False # 请求是否成功? # 请求数据 # ... # 得到的数据以字符串存储 text = "1234567" - + if not is_request_success: # 请求失败则返回None return None - - + # 如果请求数据成功, 那么返回给定字段的pd.DataFrame,值得注意的是,'爬取时间'的值是固定的,也就是使用统一的Config.CrawlerConfig.NOW_TIME_FUNC函数 # '响应文本'就是获取到的源数据,它虽然需要被存储到数据库, 但对字符串本身的格式没有要求, 因为将来你还需自定义相应的数据清洗器来将它转变成规定的格式 # 如果你对需要返回的格式仍旧有疑问, 那么请转到父类CrawlerInterface的_output_columns_type_dict属性, 它规定了从这儿走出去的数据包的表格的字段需要是什么样子的 - result_df = pd.DataFrame({'爬取时间':[Config.CrawlerConfig.NOW_TIME_FUNC()], '响应文本':[text]}) - + result_df = pd.DataFrame( + {"爬取时间": [Config.CrawlerConfig.NOW_TIME_FUNC()], "响应文本": [text]} + ) + # 最后如果请求成功,则返回这个表 return result_df - - -@ singleton + + +@singleton class EconomyCrawler(CrawlerInterface): def __init__(self): super().__init__() self._callback_func_dict = { Config.DataPackageConfig.ECONOMY_RESPONSE_GET_TAG: self._run, Config.DataPackageConfig.ECONOMY_CRAWL_FAILED_TAG: self._rerun, - } + } self.failed_tag = Config.DataPackageConfig.ECONOMY_CRAWL_FAILED_TAG - self.url = Config.CrawlerConfig.EconomyCrawlerConfig.URL # 通过这里来了解如何从配置文件获取常量, 当然你可以在自己的配置类里增加任何内容 + self.url = ( + Config.CrawlerConfig.EconomyCrawlerConfig.URL + ) # 通过这里来了解如何从配置文件获取常量, 当然你可以在自己的配置类里增加任何内容 # self.try_again_seconds = ??? # 请指定请求失败后的重新请求时间, 该属性必须填写 - def request_and_process_data(self): - is_request_success = False # 请求是否成功? + is_request_success = False # 请求是否成功? # 请求数据 # ... # 得到的数据以字符串存储 text = "1234567" - + if not is_request_success: # 请求失败则返回None return None - - + # 如果请求数据成功, 那么返回给定字段的pd.DataFrame,值得注意的是,'爬取时间'的值是固定的,也就是使用统一的Config.CrawlerConfig.NOW_TIME_FUNC函数 # '响应文本'就是获取到的源数据,它虽然需要被存储到数据库, 但对字符串本身的格式没有要求, 因为将来你还需自定义相应的数据清洗器来将它转变成规定的格式 # 如果你对需要返回的格式仍旧有疑问, 那么请转到父类CrawlerInterface的_output_columns_type_dict属性, 它规定了从这儿走出去的数据包的表格的字段需要是什么样子的 - result_df = pd.DataFrame({'爬取时间':[Config.CrawlerConfig.NOW_TIME_FUNC()], '响应文本':[text]}) - + result_df = pd.DataFrame( + {"爬取时间": [Config.CrawlerConfig.NOW_TIME_FUNC()], "响应文本": [text]} + ) + # 最后如果请求成功,则返回这个表 return result_df - -@ singleton -class OilCrawler(CrawlerInterface): + + +@singleton +class OilPriceCrawler(CrawlerInterface): def __init__(self): super().__init__() self._callback_func_dict = { Config.DataPackageConfig.OIL_PRICE_RESPONSE_GET_TAG: self._run, Config.DataPackageConfig.OIL_PRICE_CRAWL_FAILED_TAG: self._rerun, - } + } self.failed_tag = Config.DataPackageConfig.OIL_PRICE_CRAWL_FAILED_TAG - - self.url = Config.CrawlerConfig.OilCrawlerConfig.URL # 通过这里来了解如何从配置文件获取常量, 当然你可以在自己的配置类里增加任何内容 - # self.try_again_seconds = ??? # 请指定请求失败后的重新请求时间, 该属性必须填写 + self.url = Config.CrawlerConfig.OilCrawlerConfig.URL + # 通过这里来了解如何从配置文件获取常量, 当然你可以在自己的配置类里增加任何内容 + # self.try_again_seconds = ??? + self.try_again_seconds = ( + Config.CrawlerConfig.VegetableCrawlerConfig.TRY_AGAIN_SECONDS + ) + # 请指定请求失败后的重新请求时间, 该属性必须填写 - def request_and_process_data(self): - is_request_success = True # 请求是否成功? + is_request_success = False # 请求是否成功? # 请求数据 # ... # 得到的数据以字符串存储 text = "1234567" - + if not is_request_success: # 请求失败则返回None return None - - + # 如果请求数据成功, 那么返回给定字段的pd.DataFrame,值得注意的是,'爬取时间'的值是固定的,也就是使用统一的Config.CrawlerConfig.NOW_TIME_FUNC函数 # '响应文本'就是获取到的源数据,它虽然需要被存储到数据库, 但对字符串本身的格式没有要求, 因为将来你还需自定义相应的数据清洗器来将它转变成规定的格式 # 如果你对需要返回的格式仍旧有疑问, 那么请转到父类CrawlerInterface的_output_columns_type_dict属性, 它规定了从这儿走出去的数据包的表格的字段需要是什么样子的 - result_df = pd.DataFrame({'爬取时间':[Config.CrawlerConfig.NOW_TIME_FUNC()], '响应文本':[text]}) - + result_df = pd.DataFrame( + {"爬取时间": [Config.CrawlerConfig.NOW_TIME_FUNC()], "响应文本": [text]} + ) + # 最后如果请求成功,则返回这个表 - return result_df \ No newline at end of file + return result_df + + diff --git a/modules/_datamanager.py b/modules/_datamanager.py index fbe0ce4726a0ade1d641ee3ca2b93ff5e44a7794..defccf56824dad3df200a1969ab9ef04654c08be 100644 --- a/modules/_datamanager.py +++ b/modules/_datamanager.py @@ -6,6 +6,9 @@ from abc import ABC, abstractmethod import pandas as pd from loguru import logger + + + ''' 数据管理器类 ''' @@ -175,7 +178,7 @@ class DailyWeatherResponseFileSystemManager(ResponseFileSystemManager): class ExtremeWeatherResponseFileSystemManager(ResponseFileSystemManager): ''' - 日常天气响应存到文件系统 + 极限天气响应存到文件系统 ''' def __init__(self): super().__init__() @@ -183,3 +186,17 @@ class ExtremeWeatherResponseFileSystemManager(ResponseFileSystemManager): Config.DataPackageConfig.EXTREME_WEATHER_RESPONSE_MERGE_TAG: self.merge_to_csv, } self.response_file_path = Config.DataManagerConfig.FileSystemManagerConfig.ResponseFileSystemManagerConfig.ExtremeWeatherResponseFileSystemManagerConfig.RESPONSE_FILE_PATH + +class OilPriceResponseFileSystemManager(ResponseFileSystemManager): + ''' + 油价响应存到文件系统 + ''' + + def __init__(self): + super().__init__() + self._callback_func_dict = { + Config.DataPackageConfig.EXTREME_WEATHER_RESPONSE_MERGE_TAG: self.merge_to_csv, + } + self.response_file_path = Config.DataManagerConfig.FileSystemManagerConfig.ResponseFileSystemManagerConfig.ExtremeWeatherResponseFileSystemManagerConfig.RESPONSE_FILE_PATH + + \ No newline at end of file diff --git a/oil_price/oil.py b/oil_price/oil.py new file mode 100644 index 0000000000000000000000000000000000000000..3c7a21643215aba5f34c29a14ac01055c8f4ab11 --- /dev/null +++ b/oil_price/oil.py @@ -0,0 +1,158 @@ +#数据爬取 +#20231020 + +import requests,datetime,json,os +from time import sleep +from lxml import etree + + + #json文件处理,返回一个URL +def json_process(process_choice): + + process=["oil_web_list","readme","position_name"] + + #打开json库 + with open(f'{os.getcwd()}\\oil_web_list.json', 'r',encoding = "utf-8") as pf: + txt=pf.read() + json_dic = json.loads(txt)[process[process_choice]] + pf.close() + + #调用oil_web_list: + if process_choice==0: + + web=["first","second","third","fourth","fifth"] + web_choice=0 + + #尝试连接网站 + while True: + response = requests.get(json_dic[web[web_choice]]) + code = response.status_code + if code==200: + web_url=json_dic[web[web_choice]] + print("Connect successfully:\t"+web_url) + break + else: + if web_choice<4: + web_choice+=1 + else: + web_choice=0 + return str(web_url) + + #调用readme: + if process_choice==1: + return json_dic + + #调用position_name: + if process_choice==2: + return json_dic + +#使用xpath解析网页元素,输出一个键为省名,值一个列表为字典 +def analyze_with_xpath(file): + with open(file,'r',encoding="utf-8") as pf: + r= pf.read() + tree = etree.HTML(r) + bows_values = tree.xpath('//td/text()') + print("bows_values:") + print(bows_values) + + #打包成组 + bows_values_packaged=[] + for i in range(int(len(bows_values)/5)):#组的序号 + package=[bows_values[i*5],bows_values[i*5+1],bows_values[i*5+2],bows_values[i*5+3],bows_values[i*5+4]] + bows_values_packaged.append(package) + print("bows_values_packaged:") + print(bows_values_packaged) + + #将json文件当中“position_name”的键名转化成bows_keys列表的值 + bows={} + bows_keys=list(json_process(2)) + for i in range(3): + bows_keys.pop() + i=0#打包的序号 + for bow_key in bows_keys: + bows[bow_key]=bows_values_packaged[i] + i+=1 + print('解析已完成') + + return bows + +#更改工作地址 +def change_position(): + retval = os.getcwd() + print(retval) + path="C:\\VPO\\oilData" + os.mkdir(path) + if retval!=path: + os.chdir(path) + print(os.getcwd()) + +#时间调用示例 +def time_test(): + # 获取当前日期和时间 + now = datetime.datetime.now() + # 格式化日期和时间 + formatted_date = now.strftime("%Y-%m-%d") + formatted_time = now.strftime("%H:%M:%S") + #格式化后的日期: 2023-06-16 + #格式化后的时间: 16:32:00 + +#爬取,调用json_process,返回内容为一个列表,第一个值是爬取内容(文本),第二个值是状态码 +def climb(): + web_url=json_process(0) + if web_url == "http://youjia.10260.com/chaiyou/": + response = requests.get(web_url) + code= response.status_code + print("爬取成功") + return [response.text,code] + elif web_url == "https://www.cngold.org/crude/chaiyou.html": + print("Error") + return(None) + elif web_url == "https://www.5waihui.com/oil/cn/": + print("Error") + return(None) + elif web_url == "https://www.05348.com/": + print("Error") + return(None) + elif web_url == "http://gas.humeup.cn/0haochaiyou/": + print("Error") + return(None) + else: + print("Error") + return(None) + + +#运行部分,是一个循环,每24小时一次 +while True: + time = datetime.datetime.now() + #change_position() + + #在下一行设置具体时间 + if time.strftime("%H:%M:%S")=="13:11:00" or True: + #创建html文本文件并保存 + file_name=f"{time.strftime('%Y-%m-%d')}_oil_price" + f=open(f"{file_name}.html","w+",encoding="utf-8") + f.write(climb()[0]) + + #html文本文件出现空行问题,重新读取一遍数据删除空行 + _f=open(f"_{file_name}.html","w+",encoding="utf-8") + f.seek(0) + rl=f.readlines() + for i in rl: + if i != "\n" : + _f.write(i) + + #输出时间 + print(time) + + f.close() + _f.close() + + data=analyze_with_xpath(f"_{file_name}.html") + print(data) + + #将数据写入json文件 + with open(f"{file_name}.json","w",encoding="utf-8") as _f: + json.dump(data,_f) + + sleep(60)#防止重复爬取 + diff --git a/oil_price/oil_web_list.json b/oil_price/oil_web_list.json new file mode 100644 index 0000000000000000000000000000000000000000..977fc22308d5cb78a86d166453d31d29a9acc57e --- /dev/null +++ b/oil_price/oil_web_list.json @@ -0,0 +1,13 @@ +{ + "oil_web_list":{ + "first":"http://youjia.10260.com/chaiyou/", + "second":"https://www.cngold.org/crude/chaiyou.html", + "third":"https://www.5waihui.com/oil/cn/", + "fourth":"https://www.05348.com/", + "fifth":"http://gas.humeup.cn/0haochaiyou/" + }, + "readme":"存放了五个网站,在前一个网站不稳定时,将会切换到后一个网站", + "position_name":{ + "北京": "Beijin", "天津": "Tianjin", "河北": "Hebei", "山西": "Shanxi", "内蒙古": "NeiMenggu", "辽宁": "Liaoning", "吉林": "Jilin", "黑龙江": "Heilongjiang", "上海": "Shanghai", "江苏": "Jiangsu", "浙江": "Zhejiang", "安徽": "Anhui", "福建": "Fujian", "江西": "Jiangxi", "山东": "Shandong", "河南": "Henan", "湖北": "Hubei", "湖南": "Hunan", "广东": "Guangadong", "广西": "Guangxi", "海南": "Hainan", "重庆": "Chongqing", "四川": "Sichuan", "贵州": "Guizhou", "云南": "Yunnan", "西藏": "Xizang", "陕西": "Shaanxi", "甘肃": "Gansu", "青海": "Qinghai", "宁夏": "Ningxia", "新疆": "Xinjiang", "台湾": "Taiwan", "香港": "HongKong", "澳门": "Macau" + } +} \ No newline at end of file diff --git a/t1.txt b/t1.txt deleted file mode 100644 index 2420738faa709e77ee884c4ecd4490b0009da94c..0000000000000000000000000000000000000000 --- a/t1.txt +++ /dev/null @@ -1 +0,0 @@ -asdfghjk