当前位置: 首页 > 面试题库 >

创建数据库连接并维护多个进程(多处理)

程天佑
2023-03-14
问题内容

类似于我发表的另一篇文章,该文章回答了该问题并提出了一个新问题。

回顾:我需要更新空间数据库中的每条记录,在该数据库中,我有一个点数据集覆盖了多边形数据集。对于每个点要素,我想分配一个关键点以使其与其所在的多边形要素相关联。因此,如果我的点“纽约市”位于美国多边形内,并且对于美国的多边形“
GID = 1”,我将为我的点纽约市分配“ gid_fkey = 1”。

好的,这已经通过多处理来实现。我注意到使用此工具可以将速度提高150%,因此可以正常工作。但是我认为这会带来很多不必要的开销,因为每条记录都需要一个数据库连接。

所以这是代码:

import multiprocessing, time, psycopg2

class Consumer(multiprocessing.Process):

    def __init__(self, task_queue, result_queue):
        multiprocessing.Process.__init__(self)
        self.task_queue = task_queue
        self.result_queue = result_queue

    def run(self):
        proc_name = self.name
        while True:
            next_task = self.task_queue.get()
            if next_task is None:
                print 'Tasks Complete'
                self.task_queue.task_done()
                break            
            answer = next_task()
            self.task_queue.task_done()
            self.result_queue.put(answer)
        return


class Task(object):
    def __init__(self, a):
        self.a = a

    def __call__(self):        
        pyConn = psycopg2.connect("dbname='geobase_1' host = 'localhost'")
        pyConn.set_isolation_level(0)
        pyCursor1 = pyConn.cursor()

        procQuery = 'UPDATE city SET gid_fkey = gid FROM country  WHERE ST_within((SELECT the_geom FROM city WHERE city_id = %s), country.the_geom) AND city_id = %s' % (self.a, self.a)

        pyCursor1.execute(procQuery)
        print 'What is self?'
        print self.a

        return self.a

    def __str__(self):
        return 'ARC'
    def run(self):
        print 'IN'

if __name__ == '__main__':
    tasks = multiprocessing.JoinableQueue()
    results = multiprocessing.Queue()

    num_consumers = multiprocessing.cpu_count() * 2
    consumers = [Consumer(tasks, results) for i in xrange(num_consumers)]
    for w in consumers:
        w.start()

    pyConnX = psycopg2.connect("dbname='geobase_1' host = 'localhost'")
    pyConnX.set_isolation_level(0)
    pyCursorX = pyConnX.cursor()

    pyCursorX.execute('SELECT count(*) FROM cities WHERE gid_fkey IS NULL')    
    temp = pyCursorX.fetchall()    
    num_job = temp[0]
    num_jobs = num_job[0]

    pyCursorX.execute('SELECT city_id FROM city WHERE gid_fkey IS NULL')    
    cityIdListTuple = pyCursorX.fetchall()

    cityIdListList = []

    for x in cityIdListTuple:
        cityIdList.append(x[0])


    for i in xrange(num_jobs):
        tasks.put(Task(cityIdList[i - 1]))

    for i in xrange(num_consumers):
        tasks.put(None)

    while num_jobs:
        result = results.get()
        print result
        num_jobs -= 1

就像我用“时间”模块测量的那样,每次连接看起来在0.3到1.5秒之间。

有没有一种方法可以使每个进程建立数据库连接,然后仅使用city_id
info作为变量,就可以在此打开操作中将其输入到游标查询中?这样,我说了四个进程,每个进程都有一个数据库连接,然后以某种方式删除了city_id。


问题答案:

尝试在Consumer构造函数中隔离连接的创建,然后将其交给执行的Task:

import multiprocessing, time, psycopg2

class Consumer(multiprocessing.Process):

    def __init__(self, task_queue, result_queue):
        multiprocessing.Process.__init__(self)
        self.task_queue = task_queue
        self.result_queue = result_queue
        self.pyConn = psycopg2.connect("dbname='geobase_1' host = 'localhost'")
        self.pyConn.set_isolation_level(0)


    def run(self):
        proc_name = self.name
        while True:
            next_task = self.task_queue.get()
            if next_task is None:
                print 'Tasks Complete'
                self.task_queue.task_done()
                break            
            answer = next_task(connection=self.pyConn)
            self.task_queue.task_done()
            self.result_queue.put(answer)
        return


class Task(object):
    def __init__(self, a):
        self.a = a

    def __call__(self, connection=None):        
        pyConn = connection
        pyCursor1 = pyConn.cursor()

        procQuery = 'UPDATE city SET gid_fkey = gid FROM country  WHERE ST_within((SELECT the_geom FROM city WHERE city_id = %s), country.the_geom) AND city_id = %s' % (self.a, self.a)

        pyCursor1.execute(procQuery)
        print 'What is self?'
        print self.a

        return self.a

    def __str__(self):
        return 'ARC'
    def run(self):
        print 'IN'


 类似资料:
  • 从bugu-mongo 2.11版本开始,支持连接到多个数据库。 在前面的示例代码中,我们都只是连接到一个数据库: //默认的数据库连接 BuguConnection conn = BuguFramework.getInstance().createConnection(); conn.setHost("192.168.0.100"); conn.setPort(27017); conn.setU

  • 问题内容: 背景: 我正在一个将Django与Postgres数据库一起使用的项目。在某些情况下,我们也会使用mod_wsgi,因为我的一些网络搜索都提到了它。在Web表单提交中,Django视图启动了一个需要大量时间的工作(比用户希望等待的时间还长),因此我们通过后台的系统调用来启动该工作。现在正在运行的作业需要能够读取和写入数据库。因为这项工作需要很长时间,所以我们使用多重处理来并行运行它的各

  • PHP interbase中的Noob。 我正在做一个项目,我需要将数据保存在两个独立的数据库中。我使用默认的MySQL数据库,而另一个使用firebird。已下载此库 这是我的数据库。配置文件夹中的php。 需要多个数据库的函数 模型 我对create_purchaseorder函数没有问题,但是当我运行“save”函数时,它会给我这个错误 致命错误:调用未定义的函数ibase_connect(

  • 本文向大家介绍多维数据库,包括了多维数据库的使用技巧和注意事项,需要的朋友参考一下 多维数据库主要用于OLAP(在线分析处理)和数据仓库。它们可用于向用户显示多维数据。 多维数据库是从多个关系数据库创建的。关系数据库允许用户以查询形式访问数据,而多维数据库则允许用户提出与业务或市场趋势有关的分析性问题。 多维数据库使用MOLAP(多维在线分析处理)来访问其数据。它们允许用户通过相当快地生成和分析数

  • 问题内容: 我有一个使用在不同地理位置的四个数据库的应用程序。所有数据库都包含相同的表,只有数据库名称根据位置而不同。我必须在应用程序中创建一些报告,这些报告使用每个数据库中的数据。从Java应用程序创建那些数据库连接的正确方法是什么,是否有适合我使用的适合此任务的设计模式? 问题答案: 由于您没有任何的标记这个你的问题,,,,我假设你正在处理普通的JDBC。 话虽如此,我建议您有一个DAO层来处

  • 我们设置了开发环境,以便Hibernate每次启动我们的应用程序时都会创建一个新的空数据库: 这适用于部署在服务器上的单个应用程序,但在我们的开发环境中,我们通常部署连接到同一数据库的多个应用程序。它们都使用来自应用程序服务器的相同数据源。 问题在于,当它们在 JBoss AS7 中启动时,它们是并行部署的,因此两个应用程序服务器都尝试同时创建表。我们得到这样的东西(带有匿名的表格和列名): 这些

  • 我正在开发一个查询多个数据库的监控插件。我想使用HikariCP来保持连接打开,但我不知道如何实例化连接池。 HikariCP是否只使用一个池来存储多个数据库?或者一个数据库只有一个池,我的责任是实例化我将使用的数据库中的尽可能多的池。

  • 问题内容: 我有三个表:,,和。 该表包含类别的名称和其类别的ID 。 在包含两列:为这篇文章的ID和该职位的类别的ID。 该表包含有关后多列-如,等 我在网址中有一个名为parent_id的变量,它对应于一个类别。我想列出所有属于parent_id值类别的帖子(而不是类别)。 例如,假设parent_id值为5。每个帖子可能属于ID为20的类别,但该类别属于父类别(其ID为5)。我想列出所有属于