docker-osm/docker-osmenrich/enrich.py

595 wiersze
23 KiB
Python

#!/usr/bin/env python
# -*- coding: UTF-8 -*-
"""
/***************************************************************************
Docker-OSM Enrich
An enrich database of docker osm.
-------------------
begin : 2019-03-13
email : irwan at kartoza dot com
contributor : Irwan Fathurrahman
***************************************************************************/
/***************************************************************************
* *
* This program is free software; you can redistribute it and/or modify *
* it under the terms of the GNU General Public License as published by *
* the Free Software Foundation; either version 2 of the License, or *
* (at your option) any later version. *
* *
***************************************************************************/
"""
import gzip
from os import environ, listdir, mkdir
from os.path import join, exists, getsize
from sys import exit, stderr
from time import sleep
from urllib import request
import xmltodict
import yaml
from dateutil import parser
from psycopg2 import connect, OperationalError, ProgrammingError
from xmltodict import OrderedDict
class Enrich(object):
mapping_type = {
'point': 'node',
'linestring': 'way',
'polygon': 'way'
}
enriched_column = {
'changeset_id': 'int',
'changeset_version': 'int',
'changeset_timestamp': 'datetime',
'changeset_user': 'string'
}
latest_diff_file = None
cache_folder = None
out_of_scope_osm_folder = None
def __init__(self):
# Default values which can be overwritten by environment variable.
self.default = {
'TIME': 120,
'POSTGRES_USER': 'docker',
'POSTGRES_PASS': 'docker',
'POSTGRES_DBNAME': 'gis',
'POSTGRES_HOST': 'db',
'POSTGRES_PORT': '5432',
'SETTINGS': 'settings',
'OSM_API_URL': 'https://api.openstreetmap.org/api/0.6/',
'IMPORT_DONE': 'import_done',
'CACHE': 'cache',
'MAX_DIFF_FILE_SIZE': 100000000,
'DBSCHEMA_PRODUCTION': 'public',
'CACHE_MODIFY_CHECK': ''
}
self.mapping_file = None
self.mapping_database_schema = {}
self.postgis_uri = None
self.overwrite_environment()
self.check_settings()
def check_settings(self):
"""Perform various checking.
This will run when the container is starting. If an error occurs, the
container will stop.
"""
# Test files
try:
for f in listdir(self.default['SETTINGS']):
if f.endswith('.yml'):
self.mapping_file = join(self.default['SETTINGS'], f)
except FileNotFoundError:
pass
if not self.mapping_file:
self.error(
'Mapping file *.yml is missing in %s' % self.default['SETTINGS']
)
else:
self.info('Mapping: ' + self.mapping_file)
# In docker-compose, we should wait for the DB is ready.
self.check_mapping_file_data()
self.info('The checkup is OK.')
# enrich
cache_folder = self.default['CACHE']
if exists(cache_folder):
cache_folder = join(cache_folder, 'enrich')
if not exists(cache_folder):
mkdir(cache_folder)
# out_of_scope_osm
out_of_scope_osm_folder = join(
cache_folder, 'out_of_scope_osm')
if not exists(out_of_scope_osm_folder):
mkdir(out_of_scope_osm_folder)
self.out_of_scope_osm_folder = out_of_scope_osm_folder
self.cache_folder = cache_folder
# check using not found cache for modify
if self.default['CACHE_MODIFY_CHECK'].lower() == 'true':
self.default['CACHE_MODIFY_CHECK'] = True
else:
self.default['CACHE_MODIFY_CHECK'] = False
def get_cache_path(self):
return join(self.cache_folder, 'cache')
def get_cache_file(self):
""" Return path of cache file
return None if not found
"""
if self.cache_folder:
if exists(self.cache_folder):
cache_file = self.get_cache_path()
if exists(cache_file):
return cache_file
return None
def is_non_recognized_id(self, osm_type, osm_id):
""" Return if osm id and type is unrecognized id
"""
if not self.default['CACHE_MODIFY_CHECK']:
return False
if self.out_of_scope_osm_folder:
if exists(
join(self.out_of_scope_osm_folder,
'%s-%s' % (osm_type, osm_id))):
return True
return False
def get_or_create_non_recognized_id(self, osm_type, osm_id):
""" Create file as cache for non recognized id
"""
if not self.default['CACHE_MODIFY_CHECK']:
return
if self.out_of_scope_osm_folder:
filename = join(
self.out_of_scope_osm_folder,
'%s-%s' % (osm_type, osm_id))
if not exists(filename):
try:
f = open(filename, 'w+')
f.close()
except IOError:
self.info('%s can\'t be created' % filename)
def check_mapping_file_data(self):
"""Perform converting yaml data into json
that used for checking table on database
"""
self.info('Load Mapping file data.')
document = open(self.mapping_file, 'r')
mapping_data = yaml.load(document)
try:
for table, value in mapping_data['tables'].items():
try:
type = value['type']
try:
osm_type = self.mapping_type[type]
osm_id_column = None
osm_id_column_index = None
columns = ['id']
for index, column in enumerate(value['columns']):
columns.append(column['name'])
try:
if column['type'] == 'id':
osm_id_column = column['name']
osm_id_column_index = index
except KeyError:
pass
columns.extend(self.enriched_column.keys())
self.mapping_database_schema['osm_%s' % table] = {
'osm_type': osm_type,
'osm_id_columnn': osm_id_column,
'osm_id_columnn_index': osm_id_column_index,
'columns': columns
}
except KeyError:
self.info('Type %s is not yet recognized by enrich.' % type)
except KeyError:
self.info(
'Table %s doesn\'t has "type" attribute'
)
except KeyError:
self.error(
'Mapping file %s doesn\'t has "tables" attribute' % self.mapping_file
)
def create_connection(self):
return connect(
"dbname='%s' user='%s' host='%s' password='%s'" % (
self.default['POSTGRES_DBNAME'],
self.default['POSTGRES_USER'],
self.default['POSTGRES_HOST'],
self.default['POSTGRES_PASS']))
def check_database(self):
"""Test connection to PostGIS and create the URI."""
connection = self.create_connection()
cursor = connection.cursor()
try:
for table, table_data in self.mapping_database_schema.items():
new_columns_postgis = []
for enrich_key, enrich_type in self.enriched_column.items():
check_column = ''' SELECT EXISTS (SELECT 1 FROM information_schema.columns
WHERE table_name='%s' and column_name='%s'); ''' % (
table, enrich_key)
cursor.execute(check_column)
column_existence = cursor.fetchone()[0]
if column_existence != 1:
if enrich_type == 'int':
new_columns_postgis.append('ADD COLUMN IF NOT EXISTS %s NUMERIC' % enrich_key)
elif enrich_type == 'string':
new_columns_postgis.append(
'ADD COLUMN IF NOT EXISTS %s CHARACTER VARYING (255)' % enrich_key)
elif enrich_type == 'datetime':
new_columns_postgis.append('ADD COLUMN IF NOT EXISTS %s TIMESTAMPTZ' % enrich_key)
if len(new_columns_postgis) > 0:
query = 'ALTER TABLE %s."%s" %s;' % (
self.default['DBSCHEMA_PRODUCTION'], table, ','.join(new_columns_postgis))
cursor.execute(query)
connection.commit()
connection.close()
return True
except (OperationalError, ProgrammingError):
connection.rollback()
connection.close()
return False
@staticmethod
def info(message):
print(message)
@staticmethod
def error(message):
print(stderr.write(message))
exit()
def overwrite_environment(self):
"""Overwrite default values from the environment."""
for key in list(environ.keys()):
if key in list(self.default.keys()):
self.default[key] = environ[key]
def check_data_on_dict(self, data, key):
""" return data from dict with key
return None if not found
"""
try:
return data[key]
except KeyError:
return None
def get_osm_enrich_new_data(self, from_osm, from_database):
""" Convert data from xml of osm into json
and check if from osm is newer than from database
:param from_osm: Data that got from osm
:type from_osm: dict
:param from_database: Data that in local database
:type from_database: dict
:return: Dictionary of new data that need to be inserted
:rtype: dict
"""
osm_id = self.check_data_on_dict(from_osm, '@id')
row = from_database
new_data = []
if osm_id and row:
allow_updated = False
osm_timestamp = self.check_data_on_dict(from_osm, '@timestamp')
osm_datetime = parser.parse(osm_timestamp).replace(tzinfo=None)
if not row['changeset_timestamp'] or row['changeset_timestamp'] < osm_datetime:
allow_updated = True
if allow_updated:
osm_version = self.check_data_on_dict(from_osm, '@version')
osm_changeset = self.check_data_on_dict(from_osm, '@changeset')
osm_user = self.check_data_on_dict(from_osm, '@user')
self.info('Update for %s' % osm_id)
new_data = {
'changeset_id': osm_changeset,
'changeset_timestamp': osm_datetime,
'changeset_version': osm_version,
'changeset_user': osm_user
}
return new_data
def update_enrich_into_database(self, table_name, osm_id_column, osm_id, new_data):
""" Update new data into data of osm id
:param table_name: Table source of rows
:type table_name: str
:param osm_id_column: Column name of osm_id
:type osm_id_column: str
:param osm_id: osm id of data
:type osm_id: str
:param new_data: new data that will be updated
:type new_data: Dict
"""
if not new_data:
return
sets = []
for field, value in new_data.items():
try:
value = value.replace('\'', '\'\'')
except (TypeError, AttributeError):
pass
sets.append('%s=\'%s\'' % (field, value))
connection = self.create_connection()
cursor = connection.cursor()
try:
query = 'UPDATE %s.%s SET %s WHERE %s=%s' % (self.default['DBSCHEMA_PRODUCTION'],
table_name, ','.join(sets), osm_id_column, osm_id)
cursor.execute(query)
connection.commit()
except ProgrammingError as e:
connection.rollback()
self.info('%s' % e)
connection.close()
# THIS PROCESS BELOW IS FOR EMPTY CHANGESET
def update_osm_enrich_from_api_in_batch(
self, osm_ids, osm_type, row_batch, table_name, osm_id_column):
""" Get osm data from OSM API in Batch
:param osm_ids: osm id in list
:type osm_ids: list
:param osm_type: feature type of this osm
:type osm_type: str
:param osm_type: feature type of this osm
:type osm_type: str
:param row_batch: Row data from local database in dictionary
:type row_batch: Dict
:param table_name: Table source of rows
:type table_name: str
:param osm_id_column: Column name of osm_id
:type osm_id_column: str
"""
if len(osm_ids) == 0:
return
osm_type_on_url = osm_type + 's'
url = join(self.default['OSM_API_URL'], osm_type_on_url)
url += '?%s=%s' % (osm_type_on_url, ','.join(osm_ids))
self.info(url)
content = None
try:
raw_content = request.urlopen(url).read()
raw_content = xmltodict.parse(raw_content)
if type(raw_content['osm'][osm_type]) == list:
for osm in raw_content['osm'][osm_type]:
osm_id = self.check_data_on_dict(osm, '@id')
row = self.check_data_on_dict(row_batch, osm_id)
new_data = self.get_osm_enrich_new_data(
osm, row)
self.update_enrich_into_database(
table_name, osm_id_column, osm_id, new_data)
else:
osm_id = self.check_data_on_dict(
raw_content['osm'][osm_type], '@id')
row = self.check_data_on_dict(row_batch, osm_id)
new_data = self.get_osm_enrich_new_data(
raw_content['osm'][osm_type], row)
self.update_enrich_into_database(
table_name, osm_id_column, osm_id, new_data)
except Exception as e:
self.info('%s' % e)
return content
def process_empty_changeset_from_table(self, table_name, table_columns, osm_id_column, osm_type):
""" Processing all data from table
:param table_name: Table source
:type table_name: str
:param table_columns: columns of tables
:type table_columns: list
:param osm_type: feature type of this osm
:type osm_type: str
:param osm_type: feature type of this osm
:type osm_type: str
:param osm_id_column: Column name of osm_id
:type osm_id_column: str
"""
# noinspection PyUnboundLocalVariable
connection = self.create_connection()
cursor = connection.cursor()
row_batch = {}
osm_ids = []
try:
check_sql = ''' select * from %s."%s" WHERE "changeset_timestamp"
IS NULL AND "osm_id" IS NOT NULL ORDER BY "osm_id" ''' % (self.default['DBSCHEMA_PRODUCTION'], table_name)
cursor.execute(check_sql)
row = True
while row:
# do something with row
row = cursor.fetchone()
if row:
row = dict(zip(table_columns, row))
row_batch['%s' % row[osm_id_column]] = row
osm_ids.append('%s' % row[osm_id_column])
if len(osm_ids) == 30:
self.update_osm_enrich_from_api_in_batch(
osm_ids, osm_type, row_batch, table_name, osm_id_column)
row_batch = {}
osm_ids = []
self.update_osm_enrich_from_api_in_batch(
osm_ids, osm_type, row_batch, table_name, osm_id_column)
except ProgrammingError as e:
connection.rollback()
self.info('%s' % e)
def enrich_empty_changeset(self):
"""Enrich database that has empty changeset by using OSM API URL
"""
self.info('Enrich Database with empty changeset')
for table, table_data in self.mapping_database_schema.items():
osm_id_columnn = table_data['osm_id_columnn']
osm_type = table_data['osm_type']
columns = table_data['columns']
if osm_id_columnn is not None:
self.info('Checking data from table %s' % table)
self.process_empty_changeset_from_table(
table, columns, osm_id_columnn, osm_type)
else:
self.info('Does not know osm_id column for %s.' % table)
# THIS PROCESS BELOW IS FOR CHECKING DIFF FILES
def enrich_database_from_osm_data(self, osm_data, osm_data_type):
""" Convert data from xml of osm into json
and check if from osm is newer than from database
:param from_osm: Data that got from osm
:type from_osm: dict
:param from_database: Data that in local database
:type from_database: dict
:return: Dictionary of new data that need to be inserted
:rtype: dict
"""
osm_id = self.check_data_on_dict(
osm_data, '@id')
for table, table_data in self.mapping_database_schema.items():
if osm_data_type == table_data['osm_type']:
# check if this osm is not found on database
if self.is_non_recognized_id(osm_data_type, osm_id):
continue
connection = self.create_connection()
cursor = connection.cursor()
try:
validate_sql = ''' select * from %s."%s" WHERE "%s"=%s ''' % (self.default['DBSCHEMA_PRODUCTION'],
table, table_data['osm_id_columnn'],
osm_id)
cursor.execute(validate_sql)
row = cursor.fetchone()
if row:
row = dict(zip(table_data['columns'], row))
new_data = self.get_osm_enrich_new_data(osm_data, row)
self.update_enrich_into_database(
table, table_data['osm_id_columnn'], osm_id, new_data)
else:
# if this id is not found add in cache
self.get_or_create_non_recognized_id(osm_data_type, osm_id)
except Exception as e:
self.info('error when processing %s: %s' % (osm_id, e))
connection.close()
def enrich_database_from_diff_file(self):
# check latest diff file
if self.get_cache_file():
self.latest_diff_file = open(self.get_cache_file(), "r").read()
# get list diff file
next_latest_diff_file = None
target_folder = self.default['IMPORT_DONE']
self.info('Enrich Database with diff file in %s' % self.default['IMPORT_DONE'])
if not exists(target_folder):
self.info('Folder %s is not ready yet' % target_folder)
return
for filename in sorted(listdir(target_folder)):
try:
if filename.endswith('.gz'):
if not self.latest_diff_file or self.latest_diff_file < filename:
self.info('Processing %s' % filename)
# if it is newest file
# process for getting this
gzip_file = join(target_folder, filename)
if getsize(gzip_file) > self.default['MAX_DIFF_FILE_SIZE']:
self.info('File is too big, skip it')
continue
f = gzip.open(gzip_file, 'rb')
file_content = f.read()
f.close()
raw_content = xmltodict.parse(file_content)
try:
modify_list = raw_content['osmChange']['modify']
for list in modify_list:
for key, value in list.items():
if type(value) != OrderedDict:
for osm_data in value:
self.enrich_database_from_osm_data(
osm_data, key
)
else:
self.enrich_database_from_osm_data(
value, key
)
except KeyError:
self.info('%s can not be opened' % filename)
if not next_latest_diff_file or next_latest_diff_file < filename:
next_latest_diff_file = filename
except Exception as e:
self.info('Error when processing %s : %s' % (filename, e))
if next_latest_diff_file:
try:
cache_file = self.get_cache_path()
f = open(cache_file, 'w')
f.write(next_latest_diff_file)
f.close()
except IOError:
self.info('cache file can\'t be created')
def locate_table(self, name, schema):
"""Check for tables in the DB table exists in the DB"""
connection = self.create_connection()
cursor = connection.cursor()
sql = """ SELECT EXISTS (SELECT 1 AS result from information_schema.tables
where table_name like TEMP_TABLE and table_schema = 'TEMP_SCHEMA'); """
cursor.execute(sql.replace('TEMP_TABLE', '%s' % name).replace('TEMP_SCHEMA', '%s' % schema))
# noinspection PyUnboundLocalVariable
return cursor.fetchone()[0]
def run(self):
"""First checker."""
while True:
self.info('Run enrich process')
osm_tables = self.locate_table("'osm_%'", self.default['DBSCHEMA_PRODUCTION'])
if osm_tables != 1:
self.info('Imposm is still running, wait a while and try again')
else:
if self.check_database():
self.enrich_empty_changeset()
self.enrich_database_from_diff_file()
# sleep looping
self.info('sleeping for %s' % self.default['TIME'])
sleep(float(self.default['TIME']))
if __name__ == '__main__':
enrich = Enrich()
enrich.run()