Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found
Select Git revision

Target

Select target project
  • fair-data-austria-db-repository/fda-services
1 result
Select Git revision
Show changes
Showing
with 438 additions and 871 deletions
#!/bin/bash
# needed for MariaDB Connector/C
apt update && apt install -y curl gcc libmariadb3 libmariadb-dev
python3 -m venv ./dbrepo-analyse-service/venv
source ./dbrepo-analyse-service/venv/bin/activate
PIPENV_PIPFILE=./dbrepo-analyse-service/Pipfile pipenv install --dev
\ No newline at end of file
File deleted
File deleted
File deleted
File deleted
File added
File added
#!/bin/bash
source ./dbrepo-analyse-service/venv/bin/activate
cd ./dbrepo-analyse-service/ && coverage run -m pytest test/test_determine_dt.py test/test_determine_pk.py test/test_s3_client.py --junitxml=report.xml && coverage html --omit="test/*" && coverage report --omit="test/*" > ./coverage.txt
\ No newline at end of file
......@@ -117,19 +117,6 @@ class S3ClientTest(unittest.TestCase):
else:
self.fail('FileNotFoundError not raised')
# @Test
def test_bucket_exists_notExists_fails(self):
# test
try:
S3Client().bucket_exists_or_exit("idnonotexist")
except FileNotFoundError:
pass
except Exception:
self.fail('unexpected exception raised')
else:
self.fail('FileNotFoundError not raised')
if __name__ == '__main__':
unittest.main()
This diff is collapsed.
[report]
omit =
# omit tests
./tests/*
# omit ext lib
./omlib/*
exclude_lines =
if __name__ == .__main__.:
\ No newline at end of file
[html]
directory = htmlcov
No preview for this file type
File deleted
FROM docker.io/bitnami/grafana:10.4.9-debian-12-r0 AS runtime
FROM docker.io/bitnami/grafana:11.5.1 AS runtime
LABEL org.opencontainers.image.authors="martin.weise@tuwien.ac.at"
WORKDIR /app
COPY --chown=grafana:grafana ./dashboards /app/dashboards
COPY --chown=grafana:grafana ./provisioning /etc/grafana/provisioning
COPY --chown=grafana:grafana ./grafana.ini /etc/grafana/grafana.ini
COPY --chown=grafana:grafana ./ldap.toml /etc/grafana/ldap.toml
COPY --chown=grafana:grafana ./dashboards /app/dashboards
COPY --chown=grafana:grafana ./provisioning /etc/grafana/provisioning
COPY --chown=grafana:grafana ./grafana.ini /etc/grafana/grafana.ini
COPY --chown=grafana:grafana ./ldap.toml /etc/grafana/ldap.toml
import logging
from grafana_client.client import GrafanaException
from werkzeug.exceptions import NotFound
from api.dto import Permission
from clients import grafana_client
statistics_row_title = '${table_id}'
def remove_anonymous_read_access(uid: str) -> None:
grafana = grafana_client.connect()
permissions = grafana.dashboard.get_permissions_by_uid(uid)
viewer_role = [permission for permission in permissions if
'permissionName' in permission and permission['permissionName'] != 'View']
if len(viewer_role) == 0:
logging.warning(f'Failed to find permissionName=View')
return None
try:
response = grafana_client.generic_post(f'/api/access-control/dashboards/{uid}/builtInRoles/Viewer',
Permission(permission='').model_dump())
if response.status_code != 200:
raise OSError(f'Failed to remove anonymous read access: {response.content}')
except GrafanaException as e:
raise OSError(f'Failed to remove anonymous read access: {e.message}')
logging.info(f"Removed anonymous read access from dashboard with uid: {uid}")
def update_access(uid: str, username: str, permission: Permission) -> None:
try:
response = grafana_client.generic_get(f'/api/users/lookup?loginOrEmail={username}')
if response.status_code == 404:
raise NotFound(f"Failed to find user: {username}")
if response.status_code != 200:
raise OSError(f"Failed to add access to user: {username}")
grafana_client.generic_post(f"/api/access-control/dashboards/{uid}/users/{response.json()['id']}",
permission.model_dump())
except GrafanaException as e:
logging.error(f'Failed to add access: {e.message}')
logging.info(f"Add access for dashboard with uid: {uid}")
from __future__ import annotations
from typing import Optional
from pydantic import BaseModel
class Datasource(BaseModel):
uid: str
type: str
id: Optional[int] = None
orgId: Optional[int] = None
name: Optional[str] = None
typeLogoUrl: Optional[str] = None
access: Optional[str] = None
url: Optional[str] = None
user: Optional[str] = None
basicAuth: Optional[bool] = None
withCredentials: Optional[bool] = None
isDefault: Optional[bool] = None
version: Optional[int] = None
readOnly: Optional[bool] = None
jsonData: Optional[dict] = None
secureJsonFields: Optional[dict] = None
basicAuthUser: Optional[str] = None
basicAuthPassword: Optional[str] = None
password: Optional[str] = None
class CreateDatasource(BaseModel):
name: str
type: str
access: str
url: str
basicAuth: bool
version: int
readOnly: bool
jsonData: Optional[dict] = None
secureJsonData: Optional[dict] = None
basicAuthUser: Optional[str] = None
class Permission(BaseModel):
permission: str
class CreateDatasourceRequest(BaseModel):
database_internal_name: str
readonly: bool
type: str
import logging
import os
import requests
from requests import Response
from grafana_client import GrafanaApi
url = os.getenv('DASHBOARD_UI_ENDPOINT', 'http://localhost:3000')
username = os.getenv('SYSTEM_USERNAME', 'admin')
password = os.getenv('SYSTEM_PASSWORD', 'admin')
def connect() -> GrafanaApi:
return GrafanaApi.from_url(url=f'{url}', credential=(username, password))
def generic_get(api_url: str) -> Response:
request_url = url + api_url
logging.debug(f'generic get url={request_url}, auth=({username}, <reacted>)')
return requests.get(request_url, auth=(username, password))
def generic_post(api_url: str, payload: dict) -> Response:
request_url = url + api_url
logging.debug(f'generic post url={request_url}, payload={payload}, auth=({username}, <reacted>)')
return requests.post(request_url, json=payload, auth=(username, password))
import logging
import requests
from dataclasses import dataclass
from typing import List
from flask import current_app
from jwt import jwk_from_pem, JWT
@dataclass(init=True, eq=True)
class User:
username: str
roles: List[str]
class KeycloakClient:
def obtain_user_token(self, username: str, password: str) -> str:
response = requests.post(
f"{current_app.config['AUTH_SERVICE_ENDPOINT']}/realms/dbrepo/protocol/openid-connect/token",
data={
"username": username,
"password": password,
"grant_type": "password",
"client_id": current_app.config["AUTH_SERVICE_CLIENT"],
"client_secret": current_app.config["AUTH_SERVICE_CLIENT_SECRET"]
})
body = response.json()
if "access_token" not in body:
raise AssertionError(f"Failed to obtain user token(s): {response.status_code}")
return response.json()["access_token"]
def verify_jwt(self, access_token: str) -> User:
public_key = jwk_from_pem(str(current_app.config["JWT_PUBKEY"]).encode('utf-8'))
payload = JWT().decode(message=access_token, key=public_key, do_time_check=True)
return User(username=payload.get('client_id'), roles=payload.get('realm_access')["roles"])
Name Stmts Miss Cover
--------------------------------------------------
app.py 210 29 86%
clients/keycloak_client.py 36 9 75%
clients/opensearch_client.py 179 51 72%
--------------------------------------------------
TOTAL 425 89 79%
import logging
import os
from dbrepo.api.dto import Database, Table
from grafana_client.client import GrafanaException
from clients import grafana_client
statistics_row_title = '${table_id}'
base_url = os.getenv('BASE_URL', 'http://localhost')
datasource_uid = os.getenv('JSON_DATASOURCE_NAME', 'dbrepojson0')
def map_link(title: str, url: str) -> dict:
return dict(targetBlank=True,
asDropdown=False,
includeVars=False,
keepTime=False,
tags=[],
type='link',
icon='info',
title=title,
url=url)
def map_statistics_row(dashboard: dict) -> dict | None:
filtered_panels = [panel for panel in dashboard['panels'] if
panel['type'] == 'row' and panel['title'] == statistics_row_title and 'id' in panel]
if len(filtered_panels) > 0 and filtered_panels[0]['id'] is not None:
return filtered_panels[0]
logging.warning(f'Failed to find statistics row id')
return None
def map_links(database: Database) -> [dict]:
links = []
if len(database.identifiers) > 0:
links.append(map_link('Database', f"{base_url}/pid/{database.identifiers[0].id}"))
else:
links.append(map_link('Database', f"{base_url}/database/{database.id}"))
return links
def map_templating(database: Database) -> dict:
options = [dict(selected=False,
text=table.name,
value=str(table.id)) for table in database.tables]
selected = dict(selected=True,
text=[table.name for table in database.tables],
value=[str(table.id) for table in database.tables])
datasource = dict(uid=datasource_uid,
type='yesoreyeram-infinity-datasource')
return dict(list=[dict(description='',
name='table_id',
hide=0,
includeAll=True,
multi=True,
datasource=datasource,
refresh=1,
regex='',
sort=0,
definition='dbrepo-json- (infinity) json',
query=dict(queryType='infinity',
query='',
infinityQuery=dict(format='table',
filters=[],
parser='backend',
refId='variable',
root_selector='',
source='url',
type='json',
url=f"/api/database/{database.id}/table",
columns=[dict(selector='id',
text='value',
type='number'),
dict(
selector='internal_name',
text='name',
type='string')],
url_options=dict(data='',
method='GET'))),
label='Table ID',
skipUrlSync=False,
type='query',
current=selected,
options=options)])
def map_timeseries_panel(database: Database, table: Table) -> dict:
datasource = dict(uid=datasource_uid,
type='yesoreyeram-infinity-datasource')
return dict(
title=table['name'],
type='timeseries',
datasource=datasource,
targets=[dict(datasource=datasource,
format='table',
global_query_id='',
hide=False,
refId='A',
root_selector='',
source='url',
type='json',
url=f"/api/database/{database['id']}/table/{table['id']}",
url_options=dict(data='',
method='GET'))],
gridPos=dict(h=8,
w=12,
x=0,
y=0),
options=dict(legend=dict(displayMode='list',
placement='bottom',
showLegend=True),
tooltip=dict(mode='single',
sort='none')),
fieldConfig=dict(
defaults=dict(color=dict(mode='palette-classic'),
custom=dict(
axisBorderShow=False,
axisCenteredZero=False,
axisColorMode='text',
axisLabel='',
axisPlacement='auto',
barAlignment=0,
drawStyle='line',
fillOpacity=0,
gradientMode='none',
hideFrom=dict(legend=False,
tooltip=False,
viz=False),
insertNulls=False,
lineInterpolation='linear',
lineWidth=1,
pointSize=5,
scaleDistribution=dict(type='linear'),
showPoints='auto',
spanNulls=False,
stacking=dict(group='A',
mode='none'),
thresholdsStyle=dict(mode='absolute')))))
def map_panels(dashboard: dict, database_id: int | None = None) -> [dict]:
datasource = dict(uid=datasource_uid,
type='yesoreyeram-infinity-datasource')
if map_statistics_row(dashboard) is None:
dashboard['panels'].append(dict(collapsed=False,
repeat='table_id',
repeatDirection='h',
title=statistics_row_title,
type='row',
panels=[],
targets=[dict(refId='A',
datasource=datasource)],
gridPos=dict(h=1,
w=24,
x=0,
y=0)))
dashboard['panels'].append(dict(title='Sample',
type='table',
fieldConfig=dict(
defaults=dict(
color=dict(mode='palette-classic'),
custom=dict(axisBorderShow=False,
axisCenteredZero=False,
axisColorMode='text',
axisLabel='',
axisPlacement='auto',
barAlignment=0,
drawStyle='line',
fillOpacity=0,
gradientMode='none',
hideFrom=dict(
legend=False,
tooltip=False,
viz=False),
insertNulls=False,
lineInterpolation='linear',
lineWidth=1,
pointSize=5,
scaleDistribution=dict(
type='linear'),
showPoints='auto',
spanNulls=False,
stacking=dict(group='A',
mode='none'),
thresholdsStyle=dict(
mode='off'))),
overrides=[]),
options=dict(legend=dict(displayMode='list',
placement='bottom',
showLegend=True,
calcs=[]),
tooltip=dict(mode='single',
sort='none')),
targets=[dict(format='json',
columns=[],
datasource=datasource,
filters=[],
global_query_id='',
refId='A',
root_selector='',
source='url',
type='json',
url='/api/database/' + str(
database_id) + '/table/${table_id}/data',
url_options=dict(data='',
method='GET'))],
datasource=datasource,
gridPos=dict(h=4,
w=12,
x=0,
y=0)))
return dashboard['panels']
def find(uid: str):
grafana = grafana_client.connect()
try:
return grafana.dashboard.get_dashboard(uid)
except GrafanaException:
return None
def create(database_name: str, uid: str = '') -> dict:
grafana = grafana_client.connect()
dashboard = dict(uid=uid,
title=f'{database_name} Overview',
tags=['generated', 'dbrepo'],
timezone='browser',
fiscalYearStartMonth=1,
panels=[])
dashboard['panels'] = map_panels(dashboard)
payload = dict(folderUid='',
overwrite=False,
dashboard=dashboard)
dashboard = grafana.dashboard.update_dashboard(payload)
logging.info(f"Created dashboard with uid: {dashboard['uid']}")
return dashboard
def find(uid: str) -> dict | None:
grafana = grafana_client.connect()
try:
return grafana.dashboard.get_dashboard(uid)['dashboard']
except GrafanaException:
return None
def delete(uid: str) -> None:
grafana = grafana_client.connect()
grafana.dashboard.delete_dashboard(uid)
def update(database: Database) -> dict:
grafana = grafana_client.connect()
dashboard = find(database.dashboard_uid)
# update metadata
if len(database.identifiers) > 0 and len(database.identifiers[0].titles) > 0:
dashboard['title'] = database.identifiers[0].titles[0].title
if len(database.identifiers) > 0 and len(database.identifiers[0].descriptions) > 0:
dashboard['description'] = database.identifiers[0].descriptions[0].description
dashboard['links'] = map_links(database)
dashboard['templating'] = map_templating(database)
# update panels
dashboard['panels'] = map_panels(dashboard, database.id)
payload = dict(folderUid='',
overwrite=True,
dashboard=dashboard)
dashboard = grafana.dashboard.update_dashboard(payload)
logging.info(f"Updated dashboard with uid: {dashboard['uid']}")
return dashboard