| @ -0,0 +1,5 @@ | |||
| **/__pycache__ | |||
| config/config.yml | |||
| tests/data/* | |||
| @ -0,0 +1,2 @@ | |||
| --- | |||
| @ -0,0 +1,18 @@ | |||
| PY=python3 | |||
| CONFIG=config/config.yml | |||
| # avoid doing anything when typing "make" since there's nothing to build | |||
| dummy: | |||
| run: ${CONFIG} | |||
| ${PY} -m ptaimport $^ | |||
| test: | |||
| ${PY} -m unittest discover | |||
| .PHONY: dummy run test | |||
| # if running config is not already written, copy the example one | |||
| ${CONFIG}: config/example.yml | |||
| cp $< $@ | |||
| @ -0,0 +1,22 @@ | |||
| from .config import Config | |||
| from .passwords import Manager | |||
| def main(): | |||
| config_file = "config/config.yml" #TODO: take as arg | |||
| with open(config_file) as f: | |||
| config = Config( f ) | |||
| (ok, err) = config.is_correct() | |||
| if not ok: | |||
| print(f"Configuration error: {err}") | |||
| exit(255) | |||
| for data_import in config.data_imports(): | |||
| print(data_import) | |||
| #------------------------------------------------------------------------------- | |||
| if __name__ == "__main__": | |||
| main() | |||
| @ -0,0 +1,65 @@ | |||
| import yaml | |||
| from typing import List, Tuple | |||
| import ptaimport.passwords as pwd | |||
| import ptaimport.data_importers as importers | |||
| class Config: | |||
| def __init__( self, yaml_data: str ): | |||
| self.data = yaml.safe_load(yaml_data) | |||
| def is_correct( self ) -> Tuple[bool, str]: | |||
| """ | |||
| Verifies that the configuration is correct | |||
| Returns: | |||
| bool: true if the config is correct | |||
| str: error message to display to the user | |||
| """ | |||
| ok = True | |||
| errors = [] | |||
| if "passwords" not in self.data: | |||
| errors.append('Missing "passwords" section') | |||
| else: | |||
| (pwd_ok, pwd_err) = pwd.verify( | |||
| self.data["passwords"], | |||
| "passwords" | |||
| ) | |||
| ok &= pwd_ok | |||
| errors = errors + pwd_err | |||
| if "imports" not in self.data: | |||
| errors.append('missing "imports" section') | |||
| elif type(self.data["imports"]) is not list: | |||
| errors.append('"imports" section is not a list') | |||
| else: | |||
| for i, importer in enumerate(self.data["imports"]): | |||
| (imp_ok, imp_err) = importers.verify( | |||
| importer, | |||
| f"imports[{i}]" | |||
| ) | |||
| ok &= imp_ok | |||
| errors = errors + imp_err | |||
| return (ok, "\n".join(errors)) | |||
| def passwords( self ) -> pwd.PasswordManager: | |||
| """ | |||
| Returns the password manager defined in the config | |||
| """ | |||
| return pwd.Manager( self.data["passwords"] ) | |||
| def data_imports( self ) -> List[importers.DataImporter]: | |||
| """ | |||
| Returns the list of data importers defined in the config | |||
| """ | |||
| ret = [] | |||
| for config in self.data["imports"]: | |||
| ret.append( importers.Importer( | |||
| config | |||
| ) ) | |||
| return ret | |||
| @ -0,0 +1,59 @@ | |||
| from typing import List, Tuple | |||
| class DataImporter( object ): | |||
| """ | |||
| An abstract data importer. | |||
| Instanciate concrete instances via the Importer() function | |||
| """ | |||
| def retrieve( self ) -> List[str]: | |||
| """ | |||
| Retrieve a list of transactions | |||
| """ | |||
| return [] | |||
| from .mock import MockDataImporter | |||
| from .modo import ModoDataImporter | |||
| #------------------------------------------------------------------------------- | |||
| def verify( config: dict, path: str = "" ) -> Tuple[bool, List[str]]: | |||
| """ | |||
| Verifies that the configuration is correct | |||
| Args: | |||
| config: dict with password manager configuration | |||
| path: path to the "config" dict in the main configuration file | |||
| Returns: | |||
| bool: true if the config is correct | |||
| List[str]: list of errors if the config is not correct | |||
| """ | |||
| if "import" not in config: | |||
| return (False, [f'Missing "{path}.import" value']) | |||
| if "name" not in config: | |||
| return (False, [f'Missing "{path}.name" value']) | |||
| importer = config["import"] | |||
| for cls in DataImporter.__subclasses__(): | |||
| if importer == cls.name(): | |||
| return cls.verify(config, path) | |||
| return (False, [f'Unknown data importer "{importer}"']) | |||
| #------------------------------------------------------------------------------- | |||
| def Importer( config: dict ) -> DataImporter: | |||
| """ | |||
| Instanciate an Importer instance from a given configuration | |||
| """ | |||
| if "import" not in config: | |||
| raise ValueError | |||
| if "name" not in config: | |||
| raise ValueError | |||
| importer = config["import"] | |||
| for cls in DataImporter.__subclasses__(): | |||
| if importer == cls.name(): | |||
| return cls(config) | |||
| raise ValueError | |||
| @ -0,0 +1,39 @@ | |||
| from . import DataImporter | |||
| from typing import List, Tuple | |||
| class MockDataImporter( DataImporter ): | |||
| """ | |||
| Mock data importer, does not retrieve any data | |||
| Only useful for testing | |||
| """ | |||
| @classmethod | |||
| def name( cls ): | |||
| return "mock" | |||
| @classmethod | |||
| def verify( cls, config: dict, path: str ) -> Tuple[bool, List[str]]: | |||
| """ | |||
| Verifies that the configuration is correct | |||
| Args: | |||
| config: dict with password manager configuration | |||
| path: path to the "config" dict in the main configuration file | |||
| Returns: | |||
| bool: true if the config is correct | |||
| List[str]: list of errors if the config is not correct | |||
| """ | |||
| return (True, []) | |||
| def __init__( self, config: dict, **kwargs ): | |||
| pass | |||
| def retrieve( self ) -> List[str]: | |||
| return [] | |||
| @ -0,0 +1,130 @@ | |||
| from . import DataImporter | |||
| import re | |||
| import requests | |||
| from typing import Tuple, List | |||
| from bs4 import BeautifulSoup | |||
| class ModoDataImporter( DataImporter ): | |||
| """ | |||
| Modo (https://modo.coop/) data importer. | |||
| Automatically connects to your account and retrieve transactions | |||
| """ | |||
| @classmethod | |||
| def name( cls ): | |||
| return "modo" | |||
| @classmethod | |||
| def verify( cls, config: dict, path: str ) -> Tuple[bool, List[str]]: | |||
| """ | |||
| Verifies that the configuration is correct | |||
| Args: | |||
| config: dict with password manager configuration | |||
| path: path to the "config" dict in the main configuration file | |||
| Returns: | |||
| bool: true if the config is correct | |||
| List[str]: list of errors if the config is not correct | |||
| """ | |||
| ok = True | |||
| errors = [] | |||
| if "user_id" not in config: | |||
| ok = False | |||
| errors.append(f'Missing value "{path}.user_id"') | |||
| if "password_id" not in config: | |||
| ok = False | |||
| errors.append(f'Missing value "{path}.password_id"') | |||
| return (ok, errors) | |||
| def __init__( self, config: dict, **kwargs ): | |||
| self.name = str(config["name"]) | |||
| self.user_id = str(config["user_id"]) | |||
| self.pwd_id = str(config["password_id"]) | |||
| self.regex_usage = re.compile("[A-Z][a-z]+ [0-9]{4} usage details") | |||
| self.regex_money = re.compile("-?\$[0-9]+\.[0-9]{2}") | |||
| self.regex_membership_fee = re.compile("annual membership fee") | |||
| self.regex_trip = re.compile("On .*, you drove ([0-9]+) km") | |||
| def retrieve( self ) -> List[str]: | |||
| """ | |||
| Retrieves the list of transactions from modo website | |||
| """ | |||
| raise NotImplementedError #FIXME: actually implement | |||
| def _parse_invoice( self, html: str ) -> List[str]: | |||
| """ | |||
| Parses an HTML modo invoice | |||
| This might be quite brittle and require maintenance if the modo | |||
| website ever change | |||
| """ | |||
| data = BeautifulSoup(html, "html.parser") | |||
| trips = data.find( | |||
| class_="m_invoice" | |||
| ).find( | |||
| name="div", | |||
| class_="m_i_h", | |||
| string=self.regex_usage | |||
| ).find_parent( | |||
| name="div", | |||
| id=re.compile("m_i_tier_[0-9]+") | |||
| ).find_all( | |||
| class_="m_i_detail" | |||
| ) | |||
| ret = [] | |||
| for trip in trips: | |||
| text_tag = trip.find(class_="m_id_text") | |||
| if text_tag is None: | |||
| continue | |||
| amounts = trip.find_all( | |||
| class_="m_id_amount", | |||
| string=self.regex_money | |||
| ) | |||
| if len(amounts) != 3: | |||
| continue | |||
| text = "".join(text_tag.text.splitlines()) | |||
| ret.append( | |||
| amounts[0].text | |||
| + "|" | |||
| + self._parse_text(text) | |||
| ) | |||
| return ret | |||
| def _parse_text( self, text: str ) -> str: | |||
| if self.regex_membership_fee.search(text): | |||
| return "fee" | |||
| trip = self.regex_trip.search(text) | |||
| if trip: | |||
| d = trip.group(1) | |||
| return f"trip|{d}km" | |||
| return "unknown" | |||
| #payload = { | |||
| # 'fMemberNumber': '', | |||
| # 'fPassword': '' | |||
| #} | |||
| #with requests.Session() as s: | |||
| # p = s.post("https://bookit.modo.coop/home/login", data=payload) | |||
| # print(p.text) | |||
| # r = s.get("https://bookit.modo.coop/account/invoices?invoice_id=#") | |||
| # print(r.text) | |||
| @ -0,0 +1,73 @@ | |||
| from dataclasses import dataclass | |||
| from typing import Tuple, List, Optional | |||
| @dataclass | |||
| class Password: | |||
| password: Optional[str] | |||
| login: Optional[str] = None | |||
| #------------------------------------------------------------------------------- | |||
| class PasswordManager( object ): | |||
| """ | |||
| An abstract password manager. | |||
| Instanciate concrete instances via the Manager() function | |||
| """ | |||
| def get( self, identifier: str ) -> Password: | |||
| """ | |||
| Gets a password from the manager. | |||
| This function is overriden by the concrete PasswordManager instances. | |||
| Args: | |||
| identifier: which password to get | |||
| Returns: | |||
| None if the password is not found, | |||
| the password otherwise | |||
| """ | |||
| return Password( None ) | |||
| from .plaintext import PlainTextPasswordManager | |||
| from .gnupass import GNUPassPasswordManager | |||
| #------------------------------------------------------------------------------- | |||
| def verify( config: dict, path: str = "" ) -> Tuple[bool, List[str]]: | |||
| """ | |||
| Verifies that the configuration is correct | |||
| Args: | |||
| config: dict with password manager configuration | |||
| path: path to the "config" dict in the main configuration file | |||
| Returns: | |||
| bool: true if the config is correct | |||
| List[str]: list of errors if the config is not correct | |||
| """ | |||
| if "manager" not in config: | |||
| return (False, [f'Missing "{path}.manager" value']) | |||
| manager = config["manager"] | |||
| for cls in PasswordManager.__subclasses__(): | |||
| if manager == cls.name(): | |||
| return cls.verify(config, path) | |||
| return (False, [f'Unknown password manager "{manager}"']) | |||
| #------------------------------------------------------------------------------- | |||
| def Manager( config: dict ) -> PasswordManager: | |||
| """ | |||
| Instanciates a PasswordManager instance from a given configuration | |||
| """ | |||
| if "manager" not in config: | |||
| raise ValueError | |||
| manager = config["manager"] | |||
| for cls in PasswordManager.__subclasses__(): | |||
| if manager == cls.name(): | |||
| return cls(config) | |||
| raise ValueError | |||
| @ -0,0 +1,66 @@ | |||
| from . import Password, PasswordManager | |||
| import subprocess | |||
| from typing import Tuple, List | |||
| class GNUPassPasswordManager( PasswordManager ): | |||
| """ | |||
| GNU pass password manager | |||
| https://www.passwordstore.org/ | |||
| """ | |||
| @classmethod | |||
| def name( cls ): | |||
| return "pass" | |||
| @classmethod | |||
| def verify( cls, config: dict, path: str ) -> Tuple[bool, List[str]]: | |||
| """ | |||
| Verifies that the configuration is correct | |||
| Args: | |||
| config: dict with password manager configuration | |||
| path: path to the "config" dict in the main configuration file | |||
| Returns: | |||
| bool: true if the config is correct | |||
| List[str]: list of errors if the config is not correct | |||
| """ | |||
| return (True, []) | |||
| def __init__( self, config: dict ): | |||
| # no parameters | |||
| pass | |||
| def get( self, identifier: str ) -> Password: | |||
| """ | |||
| Gets a password from the manager. | |||
| This function is overriden by the concrete PasswordManager instances. | |||
| Args: | |||
| identifier: which password to get | |||
| Returns: | |||
| None if the password is not found, | |||
| the password otherwise | |||
| """ | |||
| result = subprocess.run([ | |||
| "pass", | |||
| identifier | |||
| ], | |||
| capture_output = True | |||
| ) | |||
| lines = result.stdout.decode("utf-8").splitlines() | |||
| if len(lines) == 0: | |||
| return Password( None ) | |||
| ret = Password( lines[0] ) | |||
| for i in range(1, len(lines)): | |||
| fields = lines[i].split(': ', 1) | |||
| if len(fields) != 2: | |||
| continue | |||
| if fields[0] == "login": | |||
| ret.login = str(fields[1]) | |||
| return ret | |||
| @ -0,0 +1,60 @@ | |||
| from . import Password, PasswordManager | |||
| from typing import Tuple, List, Optional | |||
| class PlainTextPasswordManager( PasswordManager ): | |||
| """ | |||
| Passwords provided as plain-text (bad idea, but always useful) | |||
| """ | |||
| @classmethod | |||
| def name( cls ): | |||
| return "plaintext" | |||
| @classmethod | |||
| def verify( cls, config: dict, path: str ) -> Tuple[bool, List[str]]: | |||
| """ | |||
| Verifies that the configuration is correct | |||
| Args: | |||
| config: dict with password manager configuration | |||
| path: path to the "config" dict in the main configuration file | |||
| Returns: | |||
| bool: true if the config is correct | |||
| List[str]: list of errors if the config is not correct | |||
| """ | |||
| ok = True | |||
| errors = [] | |||
| if "passwords" not in config: | |||
| ok = False | |||
| errors.append(f'Missing section "{path}.passwords"') | |||
| elif type(config["passwords"]) is not dict: | |||
| ok = False | |||
| errors.append(f'"{path}.passwords" is not dict') | |||
| return (ok, errors) | |||
| def __init__( self, config: dict ): | |||
| self.passwords = config["passwords"] | |||
| def get( self, identifier: str ) -> Password: | |||
| """ | |||
| Gets a password from the manager. | |||
| This function is overriden by the concrete PasswordManager instances. | |||
| Args: | |||
| identifier: which password to get | |||
| Returns: | |||
| None if the password is not found, | |||
| the password otherwise | |||
| """ | |||
| if identifier not in self.passwords: | |||
| return Password( None ) | |||
| return Password( self.passwords[identifier] ) | |||
| @ -0,0 +1,3 @@ | |||
| beautifulsoup4==4.8.2 | |||
| PyYAML==6.0.1 | |||
| Requests==2.31.0 | |||
| @ -0,0 +1,5 @@ | |||
| import os | |||
| import sys | |||
| sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), '..'))) | |||
| import ptaimport | |||
| @ -0,0 +1,112 @@ | |||
| from os.path import exists | |||
| import unittest | |||
| from .context import ptaimport | |||
| from ptaimport import data_importers as imp | |||
| class TestDataImporters( unittest.TestCase ): | |||
| """ | |||
| Tests the data_importers modules functions | |||
| """ | |||
| def setUp( self ): | |||
| # nothing provided | |||
| self.config_none = { | |||
| } | |||
| # name provided, but not import | |||
| self.config_noImport = { | |||
| "name": "test importer" | |||
| } | |||
| # import provided but no name | |||
| self.config_noName = { | |||
| "import": "mock" | |||
| } | |||
| # wrong import provided | |||
| self.config_wrong = { | |||
| "name": "test importer", | |||
| "import": "unknown" | |||
| } | |||
| # correct | |||
| self.config_mock = { | |||
| "name": "test importer", | |||
| "import": "mock" | |||
| } | |||
| def test_verify( self ): | |||
| """ | |||
| Tests the data_importers.verify() function | |||
| """ | |||
| (ok, err) = imp.verify( self.config_none ) | |||
| self.assertFalse(ok) | |||
| self.assertNotEqual( len(err), 0 ) | |||
| (ok, err) = imp.verify( self.config_noImport ) | |||
| self.assertFalse(ok) | |||
| self.assertNotEqual( len(err), 0 ) | |||
| (ok, err) = imp.verify( self.config_noName ) | |||
| self.assertFalse(ok) | |||
| self.assertNotEqual( len(err), 0 ) | |||
| (ok, err) = imp.verify( self.config_wrong ) | |||
| self.assertFalse(ok) | |||
| self.assertNotEqual( len(err), 0 ) | |||
| (ok, err) = imp.verify( self.config_mock ) | |||
| self.assertTrue(ok) | |||
| self.assertEqual( len(err), 0 ) | |||
| def test_factory( self ): | |||
| """ | |||
| Tests the data_importers.Importer() function | |||
| """ | |||
| self.assertRaises( ValueError, imp.Importer, self.config_none ) | |||
| self.assertRaises( ValueError, imp.Importer, self.config_noImport ) | |||
| self.assertRaises( ValueError, imp.Importer, self.config_noName ) | |||
| self.assertRaises( ValueError, imp.Importer, self.config_wrong ) | |||
| imp.Importer( self.config_mock ) # should not raise | |||
| #------------------------------------------------------------------------------- | |||
| from ptaimport.data_importers.modo import ModoDataImporter | |||
| class TestModoDataImporter( unittest.TestCase ): | |||
| def setUp( self ): | |||
| pass | |||
| # Note: to activate this test, save a modo invoice (HTML) from your account | |||
| # to the "tests/data/" directory (as "tests/data/modo_invoice.html"), and | |||
| # also create a "modo_invoice_expected.txt" file containing the expected | |||
| # transactions to test against | |||
| @unittest.skipUnless( exists("tests/data/modo_invoice.html") | |||
| and exists("tests/data/modo_invoice_expected.txt"), | |||
| "Missing test data" ) | |||
| def test_html_parse( self ): | |||
| modo = ModoDataImporter({ | |||
| "name": "test", | |||
| "user_id": 42, | |||
| "password_id": "none" | |||
| }) | |||
| expected_transactions = [] | |||
| with open("tests/data/modo_invoice_expected.txt") as f: | |||
| expected_transactions = f.readlines() | |||
| actual_transactions = [] | |||
| with open("tests/data/modo_invoice.html") as f: | |||
| actual_transactions = modo._parse_invoice( f.read() ) | |||
| self.assertEqual( len(expected_transactions), len(actual_transactions) ) | |||
| for i in range(0, len(expected_transactions)): | |||
| self.assertEqual( | |||
| expected_transactions[i].strip(), | |||
| actual_transactions[i].strip() | |||
| ) | |||
| @ -0,0 +1,128 @@ | |||
| import unittest | |||
| from .context import ptaimport | |||
| from ptaimport import passwords as pwd | |||
| class TestPasswords( unittest.TestCase ): | |||
| """ | |||
| Tests the passwords module functions | |||
| """ | |||
| def setUp( self ): | |||
| # no manager name given | |||
| self.config_none = { | |||
| } | |||
| # unknown manager name given | |||
| self.config_unknown = { | |||
| "manager": "unknown" | |||
| } | |||
| def test_verify( self ): | |||
| """ | |||
| Tests the passwords.verify() function | |||
| """ | |||
| (ok, err) = pwd.verify( self.config_none ) | |||
| self.assertFalse(ok) | |||
| self.assertNotEqual( len(err), 0 ) | |||
| (ok, err) = pwd.verify( self.config_unknown ) | |||
| self.assertFalse(ok) | |||
| self.assertNotEqual( len(err), 0 ) | |||
| def test_factory( self ): | |||
| """ | |||
| Tests the passwords.Manager() function | |||
| """ | |||
| self.assertRaises(ValueError, pwd.Manager, self.config_none ) | |||
| self.assertRaises(ValueError, pwd.Manager, self.config_unknown ) | |||
| #------------------------------------------------------------------------------- | |||
| class TestPlainTextPasswords( unittest.TestCase ): | |||
| """ | |||
| Tests the PlainTextPasswordManager class | |||
| """ | |||
| def setUp( self ): | |||
| # no passwords given | |||
| self.config_none = { | |||
| "manager": "plaintext" | |||
| } | |||
| # not dict "passwords" section | |||
| self.config_wrong = { | |||
| "manager": "plaintext", | |||
| "passwords": "foo" | |||
| } | |||
| # one password given | |||
| self.test_pass_name = "my_password" | |||
| self.test_pass_val = "qwerty1234" | |||
| self.config_pass = { | |||
| "manager": "plaintext", | |||
| "passwords": { | |||
| self.test_pass_name: self.test_pass_val | |||
| } | |||
| } | |||
| def test_verify(self): | |||
| """ | |||
| Tests the PlainTextPasswordManager.verify() method | |||
| """ | |||
| (ok, err) = pwd.verify( self.config_none ) | |||
| self.assertFalse(ok) | |||
| self.assertNotEqual( len(err), 0 ) | |||
| (ok, err) = pwd.verify( self.config_wrong ) | |||
| self.assertFalse(ok) | |||
| self.assertNotEqual( len(err), 0 ) | |||
| (ok, err) = pwd.verify( self.config_pass ) | |||
| self.assertTrue(ok) | |||
| self.assertEqual( len(err), 0 ) | |||
| def test_pass(self): | |||
| """ | |||
| Tests the PlainTextPasswordManager.get() method | |||
| """ | |||
| manager = pwd.Manager( self.config_pass ) | |||
| pwd_val = manager.get( self.test_pass_name ) | |||
| self.assertIsNotNone( pwd_val ) | |||
| self.assertEqual( pwd_val, pwd.Password(self.test_pass_val) ) | |||
| #------------------------------------------------------------------------------- | |||
| class TestGNUPassPasswords( unittest.TestCase ): | |||
| """ | |||
| Tests the GNUPassPasswordManager class | |||
| Unfortunately, not much can be tested automatically, as any password | |||
| retrieval would trigger user input (and anyway, we cannot know the ground | |||
| truth for tests) | |||
| """ | |||
| def setUp( self ): | |||
| self.config = { | |||
| "manager": "pass" | |||
| } | |||
| def test_create( self ): | |||
| """ | |||
| Tests the creation of a GNU pass manager | |||
| """ | |||
| (ok, err) = pwd.verify( self.config ) | |||
| self.assertTrue(ok) | |||
| self.assertEqual( len(err), 0 ) | |||
| pwd.Manager( self.config ) | |||
| #------------------------------------------------------------------------------- | |||
| if __name__ == "__main__": | |||
| unittest.main() | |||