1 Star 0 Fork 0

robertosassu/ra-verifier

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
文件
该仓库未声明开源许可证文件(LICENSE),使用请关注具体项目描述及其代码上游依赖。
克隆/下载
connection.py 2.66 KB
一键复制 编辑 原始数据 按行查看 历史
robertosassu 提交于 2021-04-06 10:19 . Migrate to python 3
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# connection.py: query the database
#
# Copyright (C) 2014 Politecnico di Torino, Italy
# TORSEC group -- http://security.polito.it
#
# Author: Roberto Sassu <roberto.sassu@polito.it>
#
# This library is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
# License as published by the Free Software Foundation; either
# version 2.1 of the License, or (at your option) any later version.
#
# This library is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public
# License along with this library. If not, see
# <http://www.gnu.org/licenses/>.
from util import *
try:
import pycassa
db_available = True
except:
db_available = False
class DBConnection(object):
def __init__(self, keyspace = None, host_list = None):
if db_available is False:
return
self.client = pycassa.ConnectionPool(keyspace, host_list,
pool_timeout = -1,
max_retries = -1)
def multiget_query(self, row_keys = [], cf_name = None, distro = "Fedora18",
include_cf_test = False, sort_reverse = False):
query_result = {}
if db_available is False:
return
if distro.startswith('Ubuntu') and cf_name is 'PackagesHistory':
cf_name += 'DEB'
cf = pycassa.ColumnFamily(self.client, cf_name)
try:
query_result = cf.multiget(row_keys, column_reversed = sort_reverse)
except pycassa.NotFoundException as TException:
pass
if include_cf_test == False:
return query_result
cf = pycassa.ColumnFamily(self.client, cf_name + '_test')
try:
query_result_test = cf.multiget(row_keys,
column_reversed = sort_reverse)
merge_dict(query_result, query_result_test)
except pycassa.NotFoundException as TException:
pass
return query_result
def insert(self, platform, cf_name, pathname, digest_type, digest_string):
query_result = {}
if db_available is False:
return
cf = pycassa.ColumnFamily(self.client, cf_name)
try:
cf.insert(platform, {pathname: {digest_type: digest_string}})
except pycassa.NotFoundException as TException:
pass
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
1
https://gitee.com/robertosassu/ra-verifier.git
git@gitee.com:robertosassu/ra-verifier.git
robertosassu
ra-verifier
ra-verifier
master

搜索帮助