Skip to content

API Reference

This is the API reference for the mitm_tooling package. It includes MITM model definition representation, MITM data representation, im/export and transformation, Superset Asset Generation, and MITM data extraction from relational databases (and files).

MITM Definitions

MITMDefinition

Bases: BaseModel

This model represents a MITM metamodel via a set of concepts, their properties, and relations.

Source code in mitm_tooling/definition/definition_representation.py
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
class MITMDefinition(pydantic.BaseModel):
    """
    This model represents a MITM metamodel via a set of concepts, their properties, and relations.
    """

    main_concepts: set[ConceptName]
    base_concepts: dict[ConceptName, MITMDataType]
    sub_concept_map: dict[ConceptName, set[ConceptName]]
    concept_relations: dict[ConceptName, OwnedRelations]  # only defined on the main_concepts level
    concept_properties: dict[ConceptName, ConceptProperties]  # available for each individual concept

    @pydantic.computed_field()
    @cached_property
    def leaf_concepts(self) -> set[ConceptName]:
        return {c for c in self.main_concepts if c not in self.sub_concept_map} | {
            sc for c in self.main_concepts for sc in self.sub_concept_map.get(c, [])
        }

    @pydantic.computed_field()
    @cached_property
    def abstract_concepts(self) -> set[ConceptName]:
        return {c for c in self.sub_concept_map}

    @pydantic.computed_field()
    @cached_property
    def parent_concept_map(self) -> dict[ConceptName, ConceptName]:
        return {sub: c for c, subs in self.sub_concept_map.items() for sub in subs}

    @property
    def inverse_concept_key_map(self) -> dict[str, ConceptName]:
        return {cp.key: c for c, cp in self.concept_properties.items()}

    def get_parent(self, concept: ConceptName) -> ConceptName | None:
        if concept in self.main_concepts:
            return concept
        elif concept in (pcm := self.parent_concept_map):
            return pcm[concept]
        return None

    def get_leaves(self, concept: ConceptName) -> set[ConceptName] | None:
        if concept in (scm := self.sub_concept_map):
            return scm[concept]
        elif concept in self.leaf_concepts:
            return {concept}
        return None

    def get(self, concept: ConceptName) -> tuple[ConceptProperties, OwnedRelations]:
        a, b = self.get_properties(concept), self.get_relations(concept)
        return a, b

    def get_properties(self, concept: ConceptName) -> ConceptProperties | None:
        return self.concept_properties.get(concept, None)

    def get_relations(self, concept: ConceptName) -> OwnedRelations | None:
        return self.concept_relations.get(self.get_parent(concept), None)

    def get_identity(self, concept: ConceptName) -> dict[RelationName, ConceptName]:
        return self.get_relations(concept).identity

    def resolve_types(self, arg: dict[RelationName, ConceptName]) -> dict[RelationName, MITMDataType]:
        return {relation_name: self.base_concepts[target_concept] for relation_name, target_concept in arg.items()}

    def resolve_inlined_types(self, concept: ConceptName) -> dict[RelationName, MITMDataType]:
        return self.resolve_types(self.get_relations(concept).inline)

    def resolve_identity_type(self, concept: ConceptName) -> dict[RelationName, MITMDataType]:
        return self.resolve_types(self.get_relations(concept).identity)

    def resolve_foreign_types(self, concept: ConceptName) -> dict[RelationName, dict[RelationName, MITMDataType]]:
        return {
            fk_name: self.resolve_types(
                {
                    name: self.get_relations(fk_info.target_concept).identity[target_name]
                    for name, target_name in fk_info.fk_relations.items()
                }
            )
            for fk_name, fk_info in self.get_relations(concept).foreign.items()
        }

MITM Data Representation

Im/Exporting

ExportableSQLiteExport

Bases: MITMExport

Export a BoundExportable to the specific SQLite file format designed for MITMs.

Source code in mitm_tooling/io/sqlite.py
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
class ExportableSQLiteExport(MITMExport):
    """
    Export a `BoundExportable` to the specific SQLite file format designed for MITMs.
    """

    bound_exportable: BoundExportable

    @classmethod
    def from_exportable(cls, exportable: Exportable, bind: AnyDBBind) -> Self:
        return cls(
            mitm=exportable.mitm,
            bound_exportable=exportable.bind(bind),
            filename=exportable.filename or f'{exportable.mitm}.zip',
        )

    def write(self, sink: DataSink, stream_data: bool = False, **kwargs) -> None:
        ensure_filepath(sink, test_existence=False)
        ensure_directory_exists(sink)

        try:
            target = create_sa_engine(AnyUrl(f'sqlite:///{str(sink)}'), poolclass=sa.StaticPool)
            insert_exportable(
                target, self.bound_exportable.bind, self.bound_exportable.exportable, stream_data=stream_data
            )
        except Exception as e:
            raise MITMIOError(f'Error exporting Exportable to SQLite: {sink}') from e
        return None

FolderExport

Bases: MITMExport

Export MITMData to a folder with the structure of the zipped file format designed for MITMs.

See ZippedExport for more details.

Source code in mitm_tooling/io/folder.py
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
class FolderExport(MITMExport):
    """
    Export `MITMData` to a folder with the structure of the zipped file format designed for MITMs.

    See `ZippedExport` for more details.
    """

    mitm_data: MITMData

    @classmethod
    def from_exportable(cls, exportable: Exportable, bind: AnyDBBind) -> Self:
        mitm_data = exportable.bind(bind).generate_mitm_data()
        return cls(
            mitm=exportable.mitm,
            mitm_data=mitm_data,
            filename=exportable.filename or f'{exportable.mitm}/',
        )

    @classmethod
    def from_bound_exportable(cls, bound_exportable: BoundExportable) -> Self:
        return cls.from_exportable(bound_exportable.exportable, bound_exportable.bind)

    def write(self, sink: DataSink, **kwargs) -> None:
        ensure_filepath(sink, test_existence=False)
        sink = ensure_ext(sink, '/', override_ext=True)
        ensure_directory_exists(sink)

        try:
            folder = os.path.dirname(sink)
            mitm_def = get_mitm_def(self.mitm)

            write_header_file(self.mitm_data.header.generate_header_df(), str(os.path.join(folder, 'header.csv')))
            for c, df in self.mitm_data:
                fn = ensure_ext(mitm_def.get_properties(c).plural, '.csv')
                write_data_file(df, str(os.path.join(folder, fn)))
                logger.debug(f'Wrote {len(df)} rows to {fn} (folder export).')
        except Exception as e:
            raise MITMIOError(f'Error exporting MITM Data to folder: {sink}') from e
        return None

FolderImport

Bases: MITMImport

Import a folder with the structure of the zipped file format designed for MITMs.

See ZippedImport for more details.

Source code in mitm_tooling/io/folder.py
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
class FolderImport(MITMImport):
    """
    Import a folder with the structure of the zipped file format designed for MITMs.

    See `ZippedImport` for more details.
    """

    def read_header(self, source: DataSource, **kwargs) -> Header:
        ensure_filepath(source)
        source = ensure_ext(source, '/', override_ext=True)

        try:
            return self.read(source, header_only=True).header
        except Exception as e:
            raise MITMIOError(f'Error reading MITM Header from folder: {source}') from e

    def read(self, source: DataSource, header_only: bool = False, **kwargs) -> MITMData:
        ensure_filepath(source)
        source = ensure_ext(source, '/', override_ext=True)

        try:
            file_names = {os.path.basename(p): p for p in glob.glob(os.path.join(source, '*.csv'))}
            mitm_def = get_mitm_def(self.mitm)

            parts = {'header': read_header_file(file_names.pop('header.csv'), normalize=True)}

            if not header_only:
                for concept in mitm_def.main_concepts:
                    fn = ensure_ext(mitm_def.get_properties(concept).plural, '.csv')
                    if (p := file_names.pop(fn, None)) is not None:
                        parts[concept] = read_data_file(
                            p, target_mitm=self.mitm, target_concept=concept, normalize=True
                        )

            return MITMData(header=Header.from_df(parts.pop('header'), self.mitm), concept_dfs=parts)
        except Exception as e:
            raise MITMIOError(f'Error reading MITMData from folder: {source}') from e

MITMDataFramesSQLiteExport

Bases: MITMExport

Directly export MITMDataFrames to the specific SQLite file format designed for MITMs.

Source code in mitm_tooling/io/sqlite.py
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
class MITMDataFramesSQLiteExport(MITMExport):
    """
    Directly export `MITMDataFrames` to the specific SQLite file format designed for MITMs.
    """

    mitm_dataframes: MITMDataFrames

    def write(self, sink: DataSink, **kwargs) -> None:
        ensure_filepath(sink, test_existence=False)
        ensure_directory_exists(sink)

        try:
            target = create_sa_engine(AnyUrl(f'sqlite:///{str(sink)}'), poolclass=sa.StaticPool)
            insert_mitm_dataframes(target, self.mitm_dataframes)
        except Exception as e:
            raise MITMIOError(f'Error exporting MITMDataFrames to SQLite: {sink}') from e
        return None

SQLiteExport

Bases: MITMExport

Export MITMData to the specific SQlite file format designed for MITMs.

Source code in mitm_tooling/io/sqlite.py
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
class SQLiteExport(MITMExport):
    """
    Export `MITMData` to the specific SQlite file format designed for MITMs.
    """

    mitm_data: MITMData

    def write(self, sink: DataSink, **kwargs) -> None:
        ensure_filepath(sink, test_existence=False)
        ensure_directory_exists(sink)
        try:
            engine = create_sa_engine(AnyUrl(f'sqlite:///{str(sink)}'), poolclass=sa.StaticPool)
            insert_mitm_data(engine, self.mitm_data)
        except Exception as e:
            raise MITMIOError(f'Error exporting MITMData to SQLite: {sink}') from e
        return None

SQLiteImport

Bases: StreamingMITMImport

Import of the specific SQLite file format designed for MITMs.

Source code in mitm_tooling/io/sqlite.py
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
class SQLiteImport(StreamingMITMImport):
    """
    Import of the specific SQLite file format designed for MITMs.
    """

    @classmethod
    def read_mitm(cls, source: DataSource):
        return cls._read_header(source).mitm

    @classmethod
    def _read_header(cls, source: DataSource, **kwargs) -> Header:
        ensure_filepath(source)

        try:
            from mitm_tooling.transformation.sql import mitm_db_into_header

            eng = create_sa_engine(AnyUrl(f'sqlite:///{source}'), poolclass=sa.StaticPool)
            return mitm_db_into_header(eng)
        except Exception as e:
            raise MITMIOError(f'Error reading MITM Header from SQLite: {source}') from e

    @classmethod
    def _read_streaming(cls, source: DataSource, **kwargs) -> StreamingMITMData:
        ensure_filepath(source)

        try:
            from mitm_tooling.transformation.sql import mitm_db_into_header, sql_rep_into_exportable

            eng = create_sa_engine(AnyUrl(f'sqlite:///{source}'), poolclass=sa.StaticPool)
            h = mitm_db_into_header(eng)
            sql_rep_schema = mk_sql_rep_schema(h)
            exp = sql_rep_into_exportable(h, sql_rep_schema)
            return exp.generate_streaming_mitm_data(eng)
        except Exception as e:
            raise MITMIOError(f'Error reading MITMData (streamed) from SQLite: {source}') from e

    @classmethod
    def _read(cls, source: DataSource, **kwargs) -> MITMData:
        ensure_filepath(source)

        try:
            from mitm_tooling.transformation.sql import mitm_db_into_header, sql_rep_into_exportable

            eng = create_sa_engine(AnyUrl(f'sqlite:///{source}'), poolclass=sa.StaticPool)
            h = mitm_db_into_header(eng)
            sql_rep_schema = mk_sql_rep_schema(h)
            exp = sql_rep_into_exportable(h, sql_rep_schema)
            return exp.generate_mitm_data(eng)
        except Exception as e:
            raise MITMIOError(f'Error reading MITMData from SQLite: {source}') from e

    def read_header(self, source: DataSource, **kwargs) -> Header:
        return self._read_header(source, **kwargs)

    def read_streaming(self, source: DataSource, **kwargs) -> StreamingMITMData:
        return self._read_streaming(source, **kwargs)

    def read(self, source: DataSource, **kwargs) -> MITMData:
        return self._read(source, **kwargs)

StreamingZippedExport

Bases: MITMExport

Export StreamingMITMData to a streamed zip file in the format designed for MITMs.

See also ZippedExport.

Source code in mitm_tooling/io/zip.py
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
class StreamingZippedExport(MITMExport):
    """
    Export `StreamingMITMData` to a streamed zip file in the format designed for MITMs.

    See also `ZippedExport`.
    """

    streaming_mitm_data: StreamingMITMData

    @classmethod
    def from_exportable(cls, exportable: Exportable, bind: AnyDBBind, **kwargs) -> Self:
        streaming_mitm_data = exportable.bind(bind).generate_streaming_mitm_data(**kwargs)
        return cls(
            mitm=exportable.mitm,
            streaming_mitm_data=streaming_mitm_data,
            filename=exportable.filename or f'{exportable.mitm}.zip',
        )

    @classmethod
    def from_bound_exportable(cls, bound_exportable: BoundExportable, **kwargs) -> Self:
        return cls.from_exportable(bound_exportable.exportable, bound_exportable.bind, **kwargs)

    def write(self, sink: ByteSink, **kwargs) -> None:
        ensure_bytes(sink)

        try:
            mitm_def = get_mitm_def(self.mitm)
            collected_header_entries = []
            with use_bytes_io(sink, expected_file_ext='.zip', mode='wb', create_file_if_necessary=True) as f:
                with zipfile.ZipFile(f, mode='w', compression=zipfile.ZIP_DEFLATED) as zf:
                    for c, concept_data in self.streaming_mitm_data:
                        fn = ensure_ext(mitm_def.get_properties(c).plural, '.csv')
                        with zf.open(fn, 'w') as cf:
                            write_data_file(concept_data.structure_df, cf, append=False)
                            for df_chunks in concept_data.chunk_iterators:
                                for df_chunk, header_entries in df_chunks:
                                    collected_header_entries.extend(header_entries)
                                    write_data_file(df_chunk, cf, append=True)
                                    logger.debug(f'Wrote {len(df_chunk)} rows to {fn} (streaming export).')

                    with zf.open('header.csv', 'w') as hf:
                        header_df = Header(
                            mitm=self.mitm, header_entries=frozenset(collected_header_entries)
                        ).generate_header_df()
                        write_header_file(header_df, hf)
        except Exception as e:
            raise MITMIOError(f'Error exporting StreamingMITMData to zip: {sink}') from e
        return None

    def iter_bytes(self, chunk_size: int = 65536) -> Iterable[bytes]:
        try:
            from stat import S_IFREG

            from stream_zip import ZIP_64, stream_zip

            mitm_def = get_mitm_def(self.mitm)
            collected_header_entries = []

            def files():
                modified_at = datetime.datetime.now()
                mode = S_IFREG | 0o600

                for c, concept_data in self.streaming_mitm_data:
                    fn = ensure_ext(mitm_def.get_properties(c).plural, '.csv')

                    def concept_file_data(concept_data=concept_data):
                        yield write_data_file(concept_data.structure_df, sink=None, append=False).encode('utf-8')
                        for df_chunks in concept_data.chunk_iterators:
                            for df_chunk, header_entries in df_chunks:
                                collected_header_entries.extend(header_entries)
                                yield write_data_file(df_chunk, sink=None, append=True).encode('utf-8')

                    yield fn, modified_at, mode, ZIP_64, concept_file_data()

                header_df = Header(
                    mitm=self.mitm, header_entries=frozenset(collected_header_entries)
                ).generate_header_df()
                yield (
                    'header.csv',
                    modified_at,
                    mode,
                    ZIP_64,
                    (write_header_file(header_df, sink=None).encode('utf-8'),),
                )

            return stream_zip(files(), chunk_size=chunk_size)
        except Exception as e:
            raise MITMIOError('Error exporting StreamingMITMData to zipfile stream') from e

ZippedExport

Bases: MITMExport

Export MITMData to the specific zip file format designed for MITMs.

Source code in mitm_tooling/io/zip.py
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
class ZippedExport(MITMExport):
    """
    Export `MITMData` to the specific zip file format designed for MITMs.
    """

    mitm_data: MITMData

    @classmethod
    def from_exportable(cls, exportable: Exportable, bind: AnyDBBind) -> Self:
        mitm_data = exportable.bind(bind).generate_mitm_data()
        return cls(
            mitm=exportable.mitm,
            mitm_data=mitm_data,
            filename=exportable.filename or f'{exportable.mitm}.zip',
        )

    @classmethod
    def from_bound_exportable(cls, bound_exportable: BoundExportable) -> Self:
        return cls.from_exportable(bound_exportable.exportable, bound_exportable.bind)

    def write(self, sink: ByteSink, **kwargs):
        ensure_bytes(sink)
        try:
            mitm_def = get_mitm_def(self.mitm)
            with use_bytes_io(sink, expected_file_ext='.zip', mode='wb', create_file_if_necessary=True) as f:
                with zipfile.ZipFile(f, mode='w', compression=zipfile.ZIP_DEFLATED) as zf:
                    with zf.open('header.csv', 'w') as hf:
                        write_header_file(self.mitm_data.header.generate_header_df(), hf)
                    for c, df in self.mitm_data:
                        fn = ensure_ext(mitm_def.get_properties(c).plural, '.csv')
                        with zf.open(fn, 'w') as cf:
                            write_data_file(df, cf)
                            logger.debug(f'Wrote {len(df)} rows to {fn} (in-memory export).')
        except Exception as e:
            raise MITMIOError(f'Error exporting MITMData to zip: {sink}') from e
        return None

ZippedImport

Bases: MITMImport

Import of the specific zip file format designed for MITMs. The data source is expected to be a zipped archive of CSV files. At least a header.csv file is expected. The other CSVs are expected to be named according to the pluralized concept names of defined in by the specified MITM. The CSVs themselves are expected to be in the format of MITMData.

Source code in mitm_tooling/io/zip.py
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
class ZippedImport(MITMImport):
    """
    Import of the specific zip file format designed for MITMs.
    The data source is expected to be a zipped archive of CSV files.
    At least a `header.csv` file is expected.
    The other CSVs are expected to be named according to the pluralized concept names of defined in by the specified `MITM`.
    The CSVs themselves are expected to be in the format of `MITMData`.
    """

    def read_header(self, source: DataSource, **kwargs) -> Header:
        try:
            return self.read(source, header_only=True).header
        except Exception as e:
            raise MITMIOError(f'Error reading MITM Header from zip: {source}') from e

    def read(self, source: DataSource, header_only: bool = False, **kwargs) -> MITMData:
        try:
            mitm_def = get_mitm_def(self.mitm)
            with use_bytes_io(source, expected_file_ext='.zip', mode='rb') as f:
                parts = {}
                with zipfile.ZipFile(f, 'r', compression=zipfile.ZIP_DEFLATED) as zf:
                    files_in_zip = set(zf.namelist())
                    if 'header.csv' not in files_in_zip:
                        raise MITMSyntacticError('MITM data zip file is missing a header.csv file.')
                    with zf.open('header.csv') as h:
                        parts['header'] = read_header_file(h, normalize=True)
                    if not header_only:
                        for concept in mitm_def.main_concepts:
                            fn = ensure_ext(mitm_def.get_properties(concept).plural, '.csv')
                            if fn in files_in_zip:
                                with zf.open(fn) as cf:
                                    parts[concept] = read_data_file(
                                        cf, target_mitm=self.mitm, target_concept=concept, normalize=True
                                    )
                return MITMData(header=Header.from_df(parts.pop('header'), self.mitm), concept_dfs=parts)
        except Exception as e:
            raise MITMIOError(f'Error reading MITMData from zip: {source}') from e

read_file(source: FilePath, variant: FileRepresentationVariant = 'zip', mitm: MITM | None = None, header_only: bool = False, **kwargs) -> MITMData | None

Read a file into a MITMData object.

Parameters:

Name Type Description Default
source FilePath

a readable byte or text buffer, or a file path

required
variant FileRepresentationVariant

the file representation variant, defaults to 'zip'

'zip'
mitm MITM | None

the target MITM, is attempted to be inferred from the file path if not specified

None
header_only bool

whether to read only the header, skipping the (potentially large) data files

False
kwargs

any additional keyword arguments to pass to the underlying ZippedImport.read() call

{}

Returns:

Type Description
MITMData | None

the MITMData object, or None if the import failed

Source code in mitm_tooling/io/interface.py
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
def read_file(
    source: FilePath,
    variant: FileRepresentationVariant = 'zip',
    mitm: MITM | None = None,
    header_only: bool = False,
    **kwargs,
) -> MITMData | None:
    """
    Read a file into a `MITMData` object.

    :param source: a readable byte or text buffer, or a file path
    :param variant: the file representation variant, defaults to 'zip'
    :param mitm: the target MITM, is attempted to be inferred from the file path if not specified
    :param header_only: whether to read only the header, skipping the (potentially large) data files
    :param kwargs: any additional keyword arguments to pass to the underlying `ZippedImport.read()` call
    :return: the `MITMData` object, or None if the import failed
    """

    if not mitm:
        match variant:
            case 'zip':
                mitm = infer_mitm_from_zip_path(source)
            case 'sqlite':
                mitm = infer_mitm_from_sqlite(source)
            case _:
                raise MITMIOError(f'Unsupported file representation variant: {variant}')

    if not mitm:
        logger.error('Attempted to import data with unspecified MitM.')
        return None

    match variant:
        case 'zip':
            io_cls = ZippedImport
        case 'sqlite':
            io_cls = SQLiteImport
        case 'folder':
            io_cls = FolderImport
        case _:
            raise MITMIOError(f'Unsupported file representation variant: {variant}')

    try:
        io = io_cls(mitm=mitm)
        if header_only:
            return MITMData(header=io.read_header(source, **kwargs), concept_dfs={})
        else:
            return io.read(source, **kwargs)
    except MITMIOError as e:
        logger.error(f'Error reading {variant} file "{source}":\n{str(e)}')
    return None

read_folder(source: DataSource, mitm: MITM | None = None, header_only: bool = False, **kwargs) -> MITMData | None

Read a folder into a MITMData object. See read_file() for more details.

Parameters:

Name Type Description Default
source DataSource

a readable byte or text buffer, or a file path

required
mitm MITM | None

the target MITM, is attempted to be inferred from the file path if not specified

None
header_only bool

whether to read only the header, skipping the (potentially large) data files

False
kwargs

any additional keyword arguments to pass to the underlying ZippedImport.read() call

{}

Returns:

Type Description
MITMData | None

the MITMData object, or None if the import failed

Source code in mitm_tooling/io/interface.py
147
148
149
150
151
152
153
154
155
156
157
def read_folder(source: DataSource, mitm: MITM | None = None, header_only: bool = False, **kwargs) -> MITMData | None:
    """
    Read a folder into a `MITMData` object. See `read_file()` for more details.

    :param source: a readable byte or text buffer, or a file path
    :param mitm: the target MITM, is attempted to be inferred from the file path if not specified
    :param header_only: whether to read only the header, skipping the (potentially large) data files
    :param kwargs: any additional keyword arguments to pass to the underlying `ZippedImport.read()` call
    :return: the `MITMData` object, or None if the import failed
    """
    return read_file(source, mitm=mitm, header_only=header_only, variant='folder', **kwargs)

read_sqlite(source: DataSource, mitm: MITM | None = None, header_only: bool = False, **kwargs) -> MITMData | None

Read a SQLite file into a MITMData object. See read_file() for more details.

Parameters:

Name Type Description Default
source DataSource

a readable byte or text buffer, or a file path

required
mitm MITM | None

the target MITM, is attempted to be inferred from the file path if not specified

None
header_only bool

whether to read only the header, skipping the (potentially large) data files

False
kwargs

any additional keyword arguments to pass to the underlying ZippedImport.read() call

{}

Returns:

Type Description
MITMData | None

the MITMData object, or None if the import failed

Source code in mitm_tooling/io/interface.py
123
124
125
126
127
128
129
130
131
132
133
def read_sqlite(source: DataSource, mitm: MITM | None = None, header_only: bool = False, **kwargs) -> MITMData | None:
    """
    Read a SQLite file into a `MITMData` object. See `read_file()` for more details.

    :param source: a readable byte or text buffer, or a file path
    :param mitm: the target MITM, is attempted to be inferred from the file path if not specified
    :param header_only: whether to read only the header, skipping the (potentially large) data files
    :param kwargs: any additional keyword arguments to pass to the underlying `ZippedImport.read()` call
    :return: the `MITMData` object, or None if the import failed
    """
    return read_file(source, mitm=mitm, header_only=header_only, variant='sqlite', **kwargs)

read_zip(source: DataSource, mitm: MITM | None = None, header_only: bool = False, **kwargs) -> MITMData | None

Read a zip file into a MITMData object. See read_file() for more details.

Parameters:

Name Type Description Default
source DataSource

a readable byte or text buffer, or a file path

required
mitm MITM | None

the target MITM, is attempted to be inferred from the file path if not specified

None
header_only bool

whether to read only the header, skipping the (potentially large) data files

False
kwargs

any additional keyword arguments to pass to the underlying ZippedImport.read() call

{}

Returns:

Type Description
MITMData | None

the MITMData object, or None if the import failed

Source code in mitm_tooling/io/interface.py
100
101
102
103
104
105
106
107
108
109
110
def read_zip(source: DataSource, mitm: MITM | None = None, header_only: bool = False, **kwargs) -> MITMData | None:
    """
    Read a zip file into a `MITMData` object. See `read_file()` for more details.

    :param source: a readable byte or text buffer, or a file path
    :param mitm: the target MITM, is attempted to be inferred from the file path if not specified
    :param header_only: whether to read only the header, skipping the (potentially large) data files
    :param kwargs: any additional keyword arguments to pass to the underlying `ZippedImport.read()` call
    :return: the `MITMData` object, or None if the import failed
    """
    return read_file(source, mitm=mitm, header_only=header_only, variant='zip', **kwargs)

write_file(target: FilePath, mitm_data: MITMData, variant: FileRepresentationVariant = 'zip', **kwargs)

Write mitm_data to a file.

Parameters:

Name Type Description Default
target FilePath

the output file path

required
mitm_data MITMData

the MITMData to write

required
variant FileRepresentationVariant

the file representation variant, defaults to 'zip'

'zip'
Source code in mitm_tooling/io/interface.py
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
def write_file(
    target: FilePath,
    mitm_data: MITMData,
    variant: FileRepresentationVariant = 'zip',
    **kwargs,
):
    """
    Write `mitm_data` to a file.

    :param target: the output file path
    :param mitm_data: the `MITMData` to write
    :param variant: the file representation variant, defaults to 'zip'
    """

    match variant:
        case 'zip':
            io_cls = ZippedExport
        case 'sqlite':
            io_cls = SQLiteExport
        case 'folder':
            io_cls = FolderExport
        case _:
            raise MITMIOError(f'Unsupported file representation variant: {variant}')

    try:
        io = io_cls(mitm=mitm_data.header.mitm, filename=os.path.basename(target), mitm_data=mitm_data)
        io.write(target, **kwargs)
    except MITMIOError as e:
        logger.error(f'Error reading {variant} file "{target}":\n{str(e)}')
    return None

write_folder(target: FilePath, mitm_data: MITMData) -> None

Write mitm_data to a folder. See read_file() for more details.

Parameters:

Name Type Description Default
target FilePath

the output file path

required
mitm_data MITMData

the MITMData to write

required
Source code in mitm_tooling/io/interface.py
160
161
162
163
164
165
166
167
def write_folder(target: FilePath, mitm_data: MITMData) -> None:
    """
    Write `mitm_data` to a folder. See `read_file()` for more details.

    :param target: the output file path
    :param mitm_data: the `MITMData` to write
    """
    return write_file(target, mitm_data, variant='folder')

write_sqlite(target: FilePath, mitm_data: MITMData) -> None

Write mitm_data to a SQLite file. See read_file() for more details.

Parameters:

Name Type Description Default
target FilePath

the output file path

required
mitm_data MITMData

the MITMData to write

required
Source code in mitm_tooling/io/interface.py
136
137
138
139
140
141
142
143
144
def write_sqlite(target: FilePath, mitm_data: MITMData) -> None:
    """
    Write `mitm_data` to a SQLite file. See `read_file()` for more details.

    :param target: the output file path
    :param mitm_data: the `MITMData` to write
    """

    return write_file(target, mitm_data, variant='sqlite')

write_zip(target: FilePath, mitm_data: MITMData) -> None

Write mitm_data to a zip file. See read_file() for more details.

Parameters:

Name Type Description Default
target FilePath

the output file path

required
mitm_data MITMData

the MITMData to write

required
Source code in mitm_tooling/io/interface.py
113
114
115
116
117
118
119
120
def write_zip(target: FilePath, mitm_data: MITMData) -> None:
    """
    Write `mitm_data` to a zip file. See `read_file()` for more details.

    :param target: the output file path
    :param mitm_data: the `MITMData` to write
    """
    return write_file(target, mitm_data, variant='zip')

Intermediate

An intermediate representation of MITM data, which was manually designed with the consideration of human-accessibility over performance and efficiency.

Header

Bases: BaseModel

This (immutable) model represents the full type information of a MITM data set.

Source code in mitm_tooling/representation/intermediate/header.py
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
class Header(pydantic.BaseModel):
    """
    This (immutable) model represents the full type information of a MITM data set.
    """

    model_config = ConfigDict(frozen=True)

    mitm: MITM
    header_entries: frozenset[HeaderEntry] = pydantic.Field(default_factory=frozenset)

    @pydantic.model_validator(mode='after')
    def consistency_check(self):
        if len(set((he.kind, he.type_name) for he in self.header_entries)) != len(self.header_entries):
            raise MITMSyntacticError('Duplicate type definition in header.')
        return self

    @classmethod
    def of(cls, mitm: MITM, *header_entries: HeaderEntry) -> Self:
        return cls(mitm=mitm, header_entries=frozenset(header_entries))

    @classmethod
    def from_df(cls, df: pd.DataFrame, mitm: MITM) -> Self:
        return Header(
            mitm=mitm, header_entries=frozenset(HeaderEntry.from_row(row, mitm) for row in df.itertuples(index=False))
        )

    @property
    def max_k(self) -> int:
        return max(map(lambda he: he.attr_k, self.header_entries), default=0)

    def generate_header_df(self) -> pd.DataFrame:
        k = self.max_k
        deduplicated = {}
        for he in self.header_entries:
            deduplicated[(he.kind, he.type_name)] = he
        lol = [he.to_row() for he in deduplicated.values()]
        return pd.DataFrame(data=lol, columns=mk_header_file_columns(k))

    def get(self, concept: ConceptName, type_name: TypeName) -> HeaderEntry | None:
        return self.as_dict.get(concept, {}).get(type_name)

    @cached_property
    def mitm_def(self):
        return get_mitm_def(self.mitm)

    @cached_property
    def as_dict(self) -> dict[ConceptName, dict[TypeName, HeaderEntry]]:
        res = defaultdict(dict)
        for he in self.header_entries:
            res[he.concept][he.type_name] = he
        return dict(res)

    @cached_property
    def as_generalized_dict(self) -> dict[ConceptName, dict[TypeName, HeaderEntry]]:
        mitm_def = get_mitm_def(self.mitm)
        res = defaultdict(dict)
        for he in self.header_entries:
            res[mitm_def.get_parent(he.concept)][he.type_name] = he
        return dict(res)

    @cached_property
    def typed_df_columns(self) -> dict[ConceptName, dict[TypeName, tuple[list[str], dict[str, MITMDataType]]]]:
        return {
            c: {tp: mk_type_table_columns(self.mitm, he) for tp, he in tps.items()} for c, tps in self.as_dict.items()
        }

    def __add__(self, other):
        return Header(mitm=self.mitm, header_entries=self.header_entries | other.header_entries)

HeaderEntry

Bases: BaseModel

This (immutable) model represents a single entry in a Header, i.e., a type definition.

Source code in mitm_tooling/representation/intermediate/header.py
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
class HeaderEntry(pydantic.BaseModel):
    """
    This (immutable) model represents a single entry in a `Header`, i.e., a type definition.
    """

    model_config = ConfigDict(frozen=True)

    concept: ConceptName
    kind: str
    type_name: TypeName
    attributes: tuple[ColumnName, ...]
    attribute_dtypes: tuple[MITMDataType, ...]

    @pydantic.model_validator(mode='after')
    def attr_check(self):
        if not len(self.attributes) == len(self.attribute_dtypes):
            raise MITMSyntacticError('Length of specified attributes and their data types differs.')
        return self

    @classmethod
    def of(
        cls,
        mitm: MITM,
        concept: ConceptName,
        type_name: TypeName,
        *attrs: tuple[ColumnName, MITMDataType],
    ) -> Self:
        if (props := get_mitm_def(mitm).get_properties(concept)) is not None:
            attributes, attribute_dtypes = zip(*attrs, strict=False) if attrs else ([], [])
            return cls(
                concept=concept,
                kind=props.key,
                type_name=type_name,
                attributes=tuple(attributes),
                attribute_dtypes=tuple(attribute_dtypes),
            )
        else:
            raise MITMSyntacticError(f'Concept {concept} does not exist in MITM definition {mitm}.')

    @classmethod
    def from_row(cls, row: Sequence[str], mitm: MITM) -> Self:
        kind, type_name = row[0], row[1]
        concept = get_mitm_def(mitm).inverse_concept_key_map.get(kind)
        if not concept:
            raise MITMTypeError(f'Encountered unknown concept key: "{kind}".')

        attrs, attr_dts = [], []
        for a, a_dt in zip(row[slice(2, None, 2)], row[slice(3, None, 2)], strict=False):
            if pd.notna(a) and pd.notna(a_dt):
                attrs.append(a)
                try:
                    mitm_dt = MITMDataType(a_dt.lower()) if a_dt else MITMDataType.Unknown
                    attr_dts.append(mitm_dt)
                except ValueError as e:
                    raise MITMTypeError(f'Encountered unrecognized data type during header import: {a_dt}.') from e

        return HeaderEntry(
            concept=concept, kind=kind, type_name=type_name, attributes=tuple(attrs), attribute_dtypes=tuple(attr_dts)
        )

    def iter_attr_dtype_pairs(self) -> Iterable[tuple[TypeName, MITMDataType]]:
        return zip(self.attributes, self.attribute_dtypes, strict=False)

    @cached_property
    def attr_k(self) -> int:
        return len(self.attributes)

    def to_row(self) -> list[str | None]:
        return [self.kind, self.type_name] + list(
            itertools.chain(*zip(self.attributes, map(str, self.attribute_dtypes), strict=False))
        )

    @cached_property
    def attr_name_map(self) -> dict[ColumnName, ColumnName]:
        return {a_anon: a for a_anon, a in zip(mk_attr_columns(self.attr_k), self.attributes, strict=False)}

MITMData

Bases: Iterable[tuple[ConceptName, DataFrame]], BaseModel

This model represents MITM data in a semi-compacted form; essentially the proposed csv file format. The individual DataFrames are expected to have fixed columns, corresponding to the type information in the header. In particular, each DataFrame should have the static columns as defined the concept it belongs to, and additionally a variable number of attribute columns named a_1,a_2,....

By default, it is assumed that the DataFrames are in the "generalized" form, meaning that the keys of the dictionary correspond to main concepts.

Source code in mitm_tooling/representation/intermediate/mitm_data.py
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
class MITMData(Iterable[tuple[ConceptName, pd.DataFrame]], pydantic.BaseModel):
    """
    This model represents MITM data in a semi-compacted form; essentially the proposed csv file format.
    The individual DataFrames are expected to have fixed columns, corresponding to the type information in the `header`.
    In particular, each DataFrame should have the static columns as defined the `concept` it belongs to,
    and additionally a variable number of attribute columns named `a_1,a_2,...`.

    By default, it is assumed that the DataFrames are in the "generalized" form, meaning that the keys of the dictionary correspond to main concepts.
    """

    model_config = ConfigDict(arbitrary_types_allowed=True)

    header: Header
    concept_dfs: dict[ConceptName, pd.DataFrame] = pydantic.Field(default_factory=dict)

    def __iter__(self):
        return iter(self.concept_dfs.items())

    def as_generalized(self) -> Self:
        """
        Generalizes the MITMData by concatenating all DataFrames with the same _parent_ concept.
        For example, for a concept hierarchy like:

        - observation
            - measurement
            - event

        The DataFrames for `measurement` and `event` will be concatenated into the DataFrame for `observation`.
        """
        mitm_def = get_mitm_def(self.header.mitm)
        dfs = defaultdict(list)
        for c, df in self.concept_dfs.items():
            c_ = mitm_def.get_parent(c)
            if not c_:
                raise MITMTypeError(f'Encountered unknown concept key: "{c}".')
            dfs[c_].append(df)
        dfs = {c: pd.concat(dfs_, axis='index', ignore_index=True) for c, dfs_ in dfs.items()}
        return MITMData(header=self.header, concept_dfs=dfs)

    def as_specialized(self) -> Self:
        """
        Specializes the MITMData by splitting all DataFrames into their leaf concepts.
        For example, for a concept hierarchy like:

        - observation
            - measurement
            - event

        The DataFrame for `observation` will be split into `measurement` and `event`.
        """
        mitm_def = get_mitm_def(self.header.mitm)
        dfs = {}
        for c, df in self:
            if mitm_def.get_properties(c).is_abstract:
                # leaf_concepts = mitm_def.get_leafs(c)

                for sub_c_key, idx in df.groupby('kind').groups.items():
                    try:
                        sub_c = mitm_def.inverse_concept_key_map[str(sub_c_key)]
                    except KeyError:
                        raise MITMTypeError(f'Encountered unknown sub concept key: "{sub_c_key}".') from None
                    dfs[sub_c] = df.loc[idx]
            else:
                dfs[c] = df
        return MITMData(header=self.header, concept_dfs=dfs)

    def as_streaming(self, chunk_size: int | None = 100_000) -> StreamingMITMData:
        from .streaming_mitm_data import StreamingConceptData, StreamingMITMData

        h = self.header
        mitm = h.mitm
        generalized_dict = h.as_generalized_dict
        mitm_def = get_mitm_def(mitm)
        max_ks = {c: max((he.attr_k for he in t_hes.values()), default=0) for c, t_hes in generalized_dict.items()}

        data_sources = {}
        for c, df in self.as_generalized():
            structure_df = pd.DataFrame(columns=mk_concept_file_header(mitm, c, max_ks[c])[0])

            props = mitm_def.get_properties(c)
            typing_col = props.typing_concept
            he_map = generalized_dict[c]

            def local_iter(df=df, he_map=he_map, typing_col=typing_col):
                dfs = chunk_df(df, chunk_size=chunk_size)
                for df_chunk in dfs:
                    included_types = df_chunk[typing_col].unique()
                    yield df_chunk, [he_map[str(type_name)] for type_name in included_types]

            if props.is_abstract:
                chunk_iterators = [local_iter(df=df.loc[idx]) for idx in df.groupby('kind').groups.values()]
            else:
                chunk_iterators = [local_iter()]

            data_sources[c] = StreamingConceptData(structure_df=structure_df, chunk_iterators=chunk_iterators)

        return StreamingMITMData(mitm=mitm, data_sources=data_sources)

as_generalized() -> Self

Generalizes the MITMData by concatenating all DataFrames with the same parent concept. For example, for a concept hierarchy like:

  • observation
    • measurement
    • event

The DataFrames for measurement and event will be concatenated into the DataFrame for observation.

Source code in mitm_tooling/representation/intermediate/mitm_data.py
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
def as_generalized(self) -> Self:
    """
    Generalizes the MITMData by concatenating all DataFrames with the same _parent_ concept.
    For example, for a concept hierarchy like:

    - observation
        - measurement
        - event

    The DataFrames for `measurement` and `event` will be concatenated into the DataFrame for `observation`.
    """
    mitm_def = get_mitm_def(self.header.mitm)
    dfs = defaultdict(list)
    for c, df in self.concept_dfs.items():
        c_ = mitm_def.get_parent(c)
        if not c_:
            raise MITMTypeError(f'Encountered unknown concept key: "{c}".')
        dfs[c_].append(df)
    dfs = {c: pd.concat(dfs_, axis='index', ignore_index=True) for c, dfs_ in dfs.items()}
    return MITMData(header=self.header, concept_dfs=dfs)

as_specialized() -> Self

Specializes the MITMData by splitting all DataFrames into their leaf concepts. For example, for a concept hierarchy like:

  • observation
    • measurement
    • event

The DataFrame for observation will be split into measurement and event.

Source code in mitm_tooling/representation/intermediate/mitm_data.py
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
def as_specialized(self) -> Self:
    """
    Specializes the MITMData by splitting all DataFrames into their leaf concepts.
    For example, for a concept hierarchy like:

    - observation
        - measurement
        - event

    The DataFrame for `observation` will be split into `measurement` and `event`.
    """
    mitm_def = get_mitm_def(self.header.mitm)
    dfs = {}
    for c, df in self:
        if mitm_def.get_properties(c).is_abstract:
            # leaf_concepts = mitm_def.get_leafs(c)

            for sub_c_key, idx in df.groupby('kind').groups.items():
                try:
                    sub_c = mitm_def.inverse_concept_key_map[str(sub_c_key)]
                except KeyError:
                    raise MITMTypeError(f'Encountered unknown sub concept key: "{sub_c_key}".') from None
                dfs[sub_c] = df.loc[idx]
        else:
            dfs[c] = df
    return MITMData(header=self.header, concept_dfs=dfs)

StreamingConceptData

Bases: BaseModel

This model represents streamable data for a specific concept, including its DataFrame structure (empty df with just column names) and a list of iterators for chunks of instances.

The instance chunks are expected to be tuples of (DataFrame, list[HeaderEntry]) where the DataFrame contains the actual data and the list of HeaderEntry provides metadata about the occurring types.

The outer list of iterators allows for multiple streams of data for the same concept, particularly when constructing the stream ad-hoc without prior knowledge of the contained concepts, e.g., when adding types individually and out-of-order w.r.t. concepts.

Note: Streamed data is assumed to be readable once.

Source code in mitm_tooling/representation/intermediate/streaming_mitm_data.py
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
class StreamingConceptData(pydantic.BaseModel):
    """
    This model represents streamable data for a specific concept, including its DataFrame structure (empty df with just column names) and a list of iterators for chunks of instances.

    The instance chunks are expected to be tuples of `(DataFrame, list[HeaderEntry])` where the DataFrame contains the actual data and the list of `HeaderEntry` provides metadata about the occurring types.

    The outer list of iterators allows for multiple streams of data for the same concept, particularly when constructing the stream ad-hoc without prior knowledge of the contained concepts, e.g., when adding types individually and out-of-order w.r.t. concepts.

    Note: Streamed data is assumed to be readable once.
    """

    model_config = ConfigDict(arbitrary_types_allowed=True)

    structure_df: pd.DataFrame
    chunk_iterators: list[Iterator[tuple[pd.DataFrame, list[HeaderEntry]]]] = pydantic.Field(default_factory=list)

StreamingMITMData

Bases: Iterable[tuple[ConceptName, StreamingConceptData]], BaseModel

This model represents streamable MITM data as a collection of StreamingConceptData.

By default, it is assumed that the streams are in the "generalized" form, meaning that the keys of the dictionary correspond to main concepts.

Note: Streamed data is assumed to be readable once.

Source code in mitm_tooling/representation/intermediate/streaming_mitm_data.py
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
class StreamingMITMData(Iterable[tuple[ConceptName, StreamingConceptData]], pydantic.BaseModel):
    """
    This model represents streamable MITM data as a collection of `StreamingConceptData`.

    By default, it is assumed that the streams are in the "generalized" form, meaning that the keys of the dictionary correspond to main concepts.

    Note: Streamed data is assumed to be readable once.
    """

    model_config = ConfigDict(arbitrary_types_allowed=True)

    mitm: MITM
    data_sources: dict[ConceptName, StreamingConceptData] = pydantic.Field(default_factory=dict)

    def __iter__(self):
        return iter(self.data_sources.items())

    def as_generalized(self) -> Self:
        mitm_def = get_mitm_def(self.mitm)
        combined_data_sources = defaultdict(list)
        for c, ds in self:
            combined_data_sources[mitm_def.get_parent(c)].append(ds)
        data_sources = {}
        for c, ds_list in combined_data_sources.items():
            structure_dfs = [ds.structure_df for ds in ds_list]
            if not all(a.equals(b) for a, b in zip(structure_dfs[:-1], structure_dfs[1:], strict=False)):
                raise MITMTypeError(f'Concept {c} not generalizable in {self} (structure_dfs differ)')

            data_sources[c] = StreamingConceptData(
                structure_df=take_first(structure_dfs),
                chunk_iterators=[it for ds in ds_list for it in ds.chunk_iterators],
            )
        return StreamingMITMData(mitm=self.mitm, data_sources=data_sources)

    def collect(self) -> MITMData:
        from .mitm_data import MITMData

        hes = []
        concept_dfs = {}
        for c, concept_data in self.as_generalized():
            dfs = [concept_data.structure_df]
            for df_chunks in concept_data.chunk_iterators:
                for df_chunk, hes_ in df_chunks:
                    # df_chunk = df_chunk.reindex(columns=concept_data.structure_df.columns)
                    dfs.append(df_chunk)
                    hes.extend(hes_)
            concept_dfs[c] = pd.concat(dfs, axis='index', ignore_index=True)

        header = Header.of(self.mitm, *hes)
        return MITMData(header=header, concept_dfs=concept_dfs)

Functionality for reading and writing CSV files of the intermediate representation.

read_data_file(source: DataSource, target_mitm: MITM | None = None, target_concept: ConceptName | None = None, normalize: bool = False) -> pd.DataFrame

Read a CSV file into a DataFrame, optionally reindexing the columns as expected in the intermediate representation given the target_concept of the target_mitm.

Parameters:

Name Type Description Default
source DataSource

a readable byte or text buffer, or a file path

required
target_mitm MITM | None

the target MITM

None
target_concept ConceptName | None

the target concept

None
normalize bool

whether to reindex the columns as expected in the intermediate representation

False

Returns:

Type Description
DataFrame

the read DataFrame

Source code in mitm_tooling/representation/file/read.py
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
def read_data_file(
    source: DataSource,
    target_mitm: MITM | None = None,
    target_concept: ConceptName | None = None,
    normalize: bool = False,
) -> pd.DataFrame:
    """
    Read a CSV file into a `DataFrame`, optionally reindexing the columns as expected
    in the intermediate representation given the `target_concept` of the `target_mitm`.

    :param source: a readable byte or text buffer, or a file path
    :param target_mitm: the target MITM
    :param target_concept: the target concept
    :param normalize: whether to reindex the columns as expected in the intermediate representation
    :return: the read `DataFrame`
    """

    with use_for_pandas_io(source) as f:
        df = pd.read_csv(f, sep=';', date_format='%Y-%m-%dT%H:%M:%S.%f%z', low_memory=False)
        if normalize and target_mitm and target_concept:
            k = guess_k_of_header_df(df)
            cols, column_dts = mk_concept_file_header(target_mitm, target_concept, k)
            df = df.reindex(columns=cols)
            convert_df(df, column_dts, inplace=True)
        return df

read_header_file(source: DataSource, normalize: bool = False) -> pd.DataFrame

Read a CSV file into a DataFrame, optionally reindexing the columns as expected in the intermediate representation of a header file.

Parameters:

Name Type Description Default
source DataSource

a readable byte or text buffer, or a file path

required
normalize bool

whether to reindex the columns as expected in the intermediate representation

False

Returns:

Type Description
DataFrame

the read DataFrame

Source code in mitm_tooling/representation/file/read.py
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
def read_header_file(source: DataSource, normalize: bool = False) -> pd.DataFrame:
    """
    Read a CSV file into a `DataFrame`, optionally reindexing the columns as expected
    in the intermediate representation of a header file.

    :param source: a readable byte or text buffer, or a file path
    :param normalize: whether to reindex the columns as expected in the intermediate representation
    :return: the read `DataFrame`
    """

    with use_for_pandas_io(source) as f:
        df = pd.read_csv(f, sep=';')
        if normalize:
            k = guess_k_of_header_df(df)
            df = df.astype(pd.StringDtype()).reindex(columns=mk_header_file_columns(k))
        return df

write_data_file(df: pd.DataFrame, sink: DataSink | None, append: bool = False) -> str | None

Write the DataFrame df to a CSV file. If sink is a FilePath, ensure the directory exists.

Parameters:

Name Type Description Default
df DataFrame

the DataFrame to write

required
sink DataSink | None

a writable byte or text buffer, or a file path

required
append bool

whether to include the column header row. It is skipped if append is True

False

Returns:

Type Description
str | None

None, or a string of the CSV-formatted contents if sink is None

Source code in mitm_tooling/representation/file/write.py
19
20
21
22
23
24
25
26
27
28
29
30
31
def write_data_file(df: pd.DataFrame, sink: DataSink | None, append: bool = False) -> str | None:
    """
    Write the `DataFrame` `df` to a CSV file. If `sink` is a `FilePath`, ensure the directory exists.

    :param df: the `DataFrame` to write
    :param sink: a writable byte or text buffer, or a file path
    :param append: whether to include the column header row. It is skipped if `append` is `True`
    :return: None, or a string of the CSV-formatted contents if `sink` is None
    """

    if isinstance(sink, FilePath):
        ensure_directory_exists(sink)
    return df.to_csv(sink, header=not append, index=False, sep=';', date_format='%Y-%m-%dT%H:%M:%S.%f%z')

write_header_file(df: pd.DataFrame, sink: DataSink | None) -> str | None

Write the DataFrame df to a CSV file. If sink is a FilePath, ensure the directory exists.

Parameters:

Name Type Description Default
df DataFrame

the DataFrame to write

required
sink DataSink | None

a writable byte or text buffer, or a file path

required

Returns:

Type Description
str | None

None, or a string of the CSV-formatted contents if sink is None

Source code in mitm_tooling/representation/file/write.py
 6
 7
 8
 9
10
11
12
13
14
15
16
def write_header_file(df: pd.DataFrame, sink: DataSink | None) -> str | None:
    """
    Write the `DataFrame` `df` to a CSV file. If `sink` is a `FilePath`, ensure the directory exists.

    :param df: the `DataFrame` to write
    :param sink: a writable byte or text buffer, or a file path
    :return: None, or a string of the CSV-formatted contents if `sink` is None
    """
    if isinstance(sink, FilePath):
        ensure_directory_exists(sink)
    return df.to_csv(sink, header=True, index=False, sep=';')

DataFrame

A normalized representation of MITM data via pandas data frames. It is most suitable for data analysis and visualization.

MITMDataFrameStream = Iterable[tuple[ConceptName, Iterable[tuple[TypeName, Iterable[pd.DataFrame]]]]] module-attribute

A stream of MITM data frames, where the data is grouped by concept and type, and includes type information (e.g., attributes/columns) next to the data frames. In contrast to StreamingMITMDataFrames, the type information (Header) is not known in advance.

MITMDataFrames

Bases: Iterable[tuple[ConceptName, dict[TypeName, DataFrame]]], BaseModel

This model represents normalized MITM Data as a collection of pandas DataFrames, hierarchically organized by concept and type. It is intended to be used for in-memory representation of normalized MITM Data, e.g., when feeding it into data science packages.

Source code in mitm_tooling/representation/df/mitm_dataframes.py
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
class MITMDataFrames(Iterable[tuple[ConceptName, dict[TypeName, pd.DataFrame]]], pydantic.BaseModel):
    """
    This model represents normalized MITM Data as a collection of pandas DataFrames, hierarchically organized by concept and type.
    It is intended to be used for in-memory representation of normalized MITM Data, e.g., when feeding it into data science packages.
    """

    model_config = ConfigDict(arbitrary_types_allowed=True)

    header: Header
    dfs: dict[ConceptName, dict[TypeName, pd.DataFrame]]

    def __iter__(self):
        return iter(self.dfs.items())

    def as_streaming(self, chunk_size: int | None = 100_000) -> StreamingMITMDataFrames:
        from .streaming_mitm_dataframes import StreamingMITMDataFrames

        return StreamingMITMDataFrames(
            header=self.header,
            df_iters={
                c: {t: chunk_df(df, chunk_size=chunk_size) for t, df in dfs.items()} for c, dfs in self.dfs.items()
            },
        )

StreamingMITMDataFrames

Bases: Iterable[tuple[ConceptName, dict[TypeName, DataFrame]]], BaseModel

This model explicitly represents a stream of structured MITM Data via a collection of Iterables. In contrast to the bare MITMDataFrameStream, only the instances are (potentially) streamed, not the type information.

Note: Streamed data is assumed to be readable once.

Source code in mitm_tooling/representation/df/streaming_mitm_dataframes.py
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
class StreamingMITMDataFrames(Iterable[tuple[ConceptName, dict[TypeName, pd.DataFrame]]], pydantic.BaseModel):
    """
    This model explicitly represents a stream of structured MITM Data via a collection of Iterables.
    In contrast to the bare `MITMDataFrameStream`, only the instances are (potentially) streamed, not the type information.

    Note: Streamed data is assumed to be readable once.
    """

    model_config = ConfigDict(arbitrary_types_allowed=True)

    header: Header
    df_iters: dict[ConceptName, dict[TypeName, Iterable[pd.DataFrame]]]

    def __iter__(self):
        return iter(self.df_iters.items())

    def stream(self) -> MITMDataFrameStream:
        return ((c, ((t, df_iter) for t, df_iter in dfs.items())) for c, dfs in self.df_iters.items())

    def typed_stream(self) -> TypedMITMDataFrameStream:
        he_dict = self.header.as_dict
        return ((c, ((t, he_dict[c][t], df_iter) for t, df_iter in dfs.items())) for c, dfs in self.df_iters.items())

    def collect(self) -> MITMDataFrames:
        return collect_typed_mitm_dataframe_stream(self.header.mitm, self.typed_stream())

Relational/SQL

A relational representation of MITM data via a collection of tables and views. It is most suitable for data storage and ETL-pipeline generation.

SQLRepresentationSchema

Bases: BaseModel

This model represents the SQL representation of a MITM data set via a collection of SQLAlchemy tables and views. It is not serializable itself but can be generated from a Header object (i.e., pure type information).

Source code in mitm_tooling/representation/sql/common.py
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
class SQLRepresentationSchema(pydantic.BaseModel):
    """
    This model represents the SQL representation of a MITM data set via a collection of SQLAlchemy tables and views.
    It is not serializable itself but can be generated from a `Header` object (i.e., pure type information).
    """

    model_config = ConfigDict(arbitrary_types_allowed=True, strict=False)

    mitm: MITM
    sa_meta: sa.MetaData
    meta_tables: HeaderMetaTables | None = None
    concept_tables: ConceptTablesDict = pydantic.Field(default_factory=ConceptTablesDict)
    type_tables: ConceptTypeTablesDict = pydantic.Field(default_factory=ConceptTypeTablesDict)
    views: ViewsDict = pydantic.Field(default_factory=ViewsDict)

    def get_concept_table(self, concept: ConceptName) -> sa.Table | None:
        return self.concept_tables.get(concept)

    def get_type_table(self, concept: ConceptName, type_name: TypeName) -> sa.Table | None:
        return self.type_tables.get(concept, {}).get(type_name)

    @property
    def view_tables(self) -> dict[str, sa.Table]:
        return {k: t for k, (t, _) in self.views.items()}

    @property
    def tables_list(self) -> list[sa.Table]:
        return list(self.sa_meta.tables.values()) + list(self.view_tables.values())

append_data(bind: AnyDBBind, gen_sql_rep_schema: Callable[[], SQLRepresentationSchema], gen_instances: Callable[[], TypedMITMDataFrameStream] = lambda: ()) -> SQLRepInsertionResult

Append a stream of MITM dataframes into a relational database, with tables (and views) defined by the given SQL representation schema. This assumes that the schema is already created. In particular, this implies that the instances to be inserted cannot be of any type not yet present in the schema.

Note that if this function is called with a bind of type Connection (as opposed to a Engine), a manual commit is required after calling it to persist the changes. Internally, all changes are performed within nested transactions.

Parameters:

Name Type Description Default
bind AnyDBBind

a bind to the database to insert into

required
gen_sql_rep_schema Callable[[], SQLRepresentationSchema]

a factory for the SQL representation schema to use

required
gen_instances Callable[[], TypedMITMDataFrameStream]

a factory for a stream of (typed) instances to insert

lambda: ()

Returns:

Type Description
SQLRepInsertionResult

a summary of the inserted instances

Source code in mitm_tooling/representation/sql/sql_mutation/interface.py
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
def append_data(
    bind: AnyDBBind,
    gen_sql_rep_schema: Callable[[], SQLRepresentationSchema],
    gen_instances: Callable[[], TypedMITMDataFrameStream] = lambda: (),
) -> SQLRepInsertionResult:
    """
    Append a stream of MITM dataframes into a relational database, with tables (and views) defined by the given SQL representation schema.
    This assumes that the schema is already created. In particular, this implies that the instances to be inserted cannot be of any type not yet present in the schema.

    Note that if this function is called with a `bind` of type `Connection` (as opposed to a `Engine`),
    a manual commit is required after calling it to persist the changes.
    Internally, all changes are performed within nested transactions.

    :param bind: a bind to the database to insert into
    :param gen_sql_rep_schema: a factory for the SQL representation schema to use
    :param gen_instances: a factory for a stream of (typed) instances to insert
    :return: a summary of the inserted instances
    """

    sql_rep_schema = gen_sql_rep_schema()
    insertion_result = insert_instances(bind, sql_rep_schema, gen_instances())
    return insertion_result

create_schema(bind: AnyDBBind, gen_sql_rep_schema: Callable[[], SQLRepresentationSchema]) -> None

Create the mitm database schema defined by the given SQL representation schema factory.

Note that if this function is called with a bind of type Connection (as opposed to a Engine), a manual commit is required after calling it to persist the changes. Internally, all changes are performed within nested transactions.

Parameters:

Name Type Description Default
bind AnyDBBind

a bind to the database to insert into

required
gen_sql_rep_schema Callable[[], SQLRepresentationSchema]

a factory for the SQL representation schema to use

required

Returns:

Type Description
None
Source code in mitm_tooling/representation/sql/sql_mutation/interface.py
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
def create_schema(
    bind: AnyDBBind,
    gen_sql_rep_schema: Callable[[], SQLRepresentationSchema],
) -> None:
    """
    Create the mitm database schema defined by the given SQL representation schema factory.

    Note that if this function is called with a `bind` of type `Connection` (as opposed to a `Engine`),
    a manual commit is required after calling it to persist the changes.
    Internally, all changes are performed within nested transactions.

    :param bind: a bind to the database to insert into
    :param gen_sql_rep_schema: a factory for the SQL representation schema to use
    :return:
    """

    sql_rep_schema = gen_sql_rep_schema()
    create_db_schema(bind, sql_rep_schema)

drop_data(bind: AnyDBBind, gen_sql_rep_schema: Callable[[], SQLRepresentationSchema])

Drop all instances from all tables, using the given ´gen_sql_rep_schema`. This preserves the type tables and meta-tables themselves.

Note that if this function is called with a bind of type Connection (as opposed to a Engine), a manual commit is required after calling it to persist the changes. Internally, all changes are performed within nested transactions.

Parameters:

Name Type Description Default
bind AnyDBBind

a bind to the database to insert into

required
gen_sql_rep_schema Callable[[], SQLRepresentationSchema]

a factory for the SQL representation schema to use

required

Returns:

Type Description
Source code in mitm_tooling/representation/sql/sql_mutation/interface.py
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
def drop_data(
    bind: AnyDBBind,
    gen_sql_rep_schema: Callable[[], SQLRepresentationSchema],
):
    """
    Drop all instances from all tables, using the given ´gen_sql_rep_schema`.
    This preserves the type tables and meta-tables themselves.

    Note that if this function is called with a `bind` of type `Connection` (as opposed to a `Engine`),
    a manual commit is required after calling it to persist the changes.
    Internally, all changes are performed within nested transactions.

    :param bind: a bind to the database to insert into
    :param gen_sql_rep_schema: a factory for the SQL representation schema to use
    :return:
    """

    sql_rep_schema = gen_sql_rep_schema()
    try:
        drop_type_instances(bind, sql_rep_schema)
    except Exception as e:
        raise SQLRepresentationInstanceUpdateError('Dropping of all instances failed') from e

drop_schema(bind: AnyDBBind, gen_sql_rep_schema: Callable[[], SQLRepresentationSchema]) -> None

Drop the mitm database schema defined by the given SQL representation schema factory.

Note that if this function is called with a bind of type Connection (as opposed to a Engine), a manual commit is required after calling it to persist the changes. Internally, all changes are performed within nested transactions.

Parameters:

Name Type Description Default
bind AnyDBBind

a bind to the database to insert into

required
gen_sql_rep_schema Callable[[], SQLRepresentationSchema]

a factory for the SQL representation schema to use

required

Returns:

Type Description
None
Source code in mitm_tooling/representation/sql/sql_mutation/interface.py
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
def drop_schema(
    bind: AnyDBBind,
    gen_sql_rep_schema: Callable[[], SQLRepresentationSchema],
) -> None:
    """
    Drop the mitm database schema defined by the given SQL representation schema factory.

    Note that if this function is called with a `bind` of type `Connection` (as opposed to a `Engine`),
    a manual commit is required after calling it to persist the changes.
    Internally, all changes are performed within nested transactions.

    :param bind: a bind to the database to insert into
    :param gen_sql_rep_schema: a factory for the SQL representation schema to use
    :return:
    """
    sql_rep_schema = gen_sql_rep_schema()
    drop_db_schema(bind, sql_rep_schema)

drop_type_instances(bind: AnyDBBind, sql_rep_schema: SQLRepresentationSchema, types_to_drop: Iterable[tuple[ConceptName, TypeName]] | None = None) -> None

Drop all instances of the given types from the mitm database given by the sql_rep_schema.

Parameters:

Name Type Description Default
bind AnyDBBind

a bind to the database to insert into

required
sql_rep_schema SQLRepresentationSchema

the SQL representation schema to use

required
types_to_drop Iterable[tuple[ConceptName, TypeName]] | None

an iterable of (concept, type_name) tuples to drop instances of

None

Returns:

Type Description
None
Source code in mitm_tooling/representation/sql/sql_mutation/drop_instances.py
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
def drop_type_instances(
    bind: AnyDBBind,
    sql_rep_schema: SQLRepresentationSchema,
    types_to_drop: Iterable[tuple[ConceptName, TypeName]] | None = None,
) -> None:
    """
    Drop all instances of the given types from the mitm database given by the `sql_rep_schema`.

    :param bind: a bind to the database to insert into
    :param sql_rep_schema: the SQL representation schema to use
    :param types_to_drop: an iterable of (concept, type_name) tuples to drop instances of
    :return:
    """
    if types_to_drop is None:
        types_to_drop = [
            (c, type_name) for c, type_tables in sql_rep_schema.type_tables.items() for type_name in type_tables
        ]
    try:
        with use_nested_conn(bind) as conn:
            _drop_types(conn, sql_rep_schema, types_to_drop, instances_only=True)
    except Exception as e:
        raise SQLRepresentationInstanceUpdateError('Dropping of instances of types failed') from e

insert_data(bind: AnyDBBind, gen_header: Callable[[], Header], gen_sql_rep_schema: Callable[[Header], SQLRepresentationSchema] = lambda h: mk_sql_rep_schema(h), gen_instances: Callable[[], TypedMITMDataFrameStream] = lambda: ()) -> SQLRepInsertionResult

Insert a stream of MITM dataframes into a relational database, with tables (and views) defined by the given SQL representation schema. The schema is first created and then the data is inserted. Finally, the meta-tables are updated with the header information.

Note that if this function is called with a bind of type Connection (as opposed to a Engine), a manual commit is required after calling it to persist the changes. Internally, all changes are performed within nested transactions.

Parameters:

Name Type Description Default
bind AnyDBBind

a bind to the database to insert into

required
gen_header Callable[[], Header]

a factory for a header to use for the SQL representation schema

required
gen_sql_rep_schema Callable[[Header], SQLRepresentationSchema]

a factory for the SQL representation schema to use

lambda h: mk_sql_rep_schema(h)
gen_instances Callable[[], TypedMITMDataFrameStream]

a factory for a stream of (typed) instances to insert

lambda: ()

Returns:

Type Description
SQLRepInsertionResult

a summary of the inserted instances

Source code in mitm_tooling/representation/sql/sql_mutation/interface.py
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
def insert_data(
    bind: AnyDBBind,
    gen_header: Callable[[], Header],
    gen_sql_rep_schema: Callable[[Header], SQLRepresentationSchema] = lambda h: mk_sql_rep_schema(h),
    gen_instances: Callable[[], TypedMITMDataFrameStream] = lambda: (),
) -> SQLRepInsertionResult:
    """
    Insert a stream of MITM dataframes into a relational database, with tables (and views) defined by the given SQL representation schema.
    The schema is first created and then the data is inserted.
    Finally, the meta-tables are updated with the header information.

    Note that if this function is called with a `bind` of type `Connection` (as opposed to a `Engine`),
    a manual commit is required after calling it to persist the changes.
    Internally, all changes are performed within nested transactions.

    :param bind: a bind to the database to insert into
    :param gen_header: a factory for a header to use for the SQL representation schema
    :param gen_sql_rep_schema: a factory for the SQL representation schema to use
    :param gen_instances: a factory for a stream of (typed) instances to insert
    :return: a summary of the inserted instances
    """

    header = gen_header()
    sql_rep_schema = gen_sql_rep_schema(header)
    create_db_schema(bind, sql_rep_schema)
    insertion_result = insert_instances(bind, sql_rep_schema, gen_instances())
    update_meta_data(bind, sql_rep_schema, header)
    return insertion_result

mk_sql_rep_schema(header: Header, target_schema: SchemaName | None = SQL_REPRESENTATION_DEFAULT_SCHEMA, skip_fk_constraints: bool = False, skip_views: bool = False, include_meta_tables: bool = True) -> SQLRepresentationSchema

Generate an SQLRepresentationSchema from a Header. The canonical relational MITM representation requires the inclusion of the meta-tables and views.

Parameters:

Name Type Description Default
header Header

the header to generate the schema from

required
target_schema SchemaName | None

the name of the schema to create the tables in. By default, the SQL_REPRESENTATION_DEFAULT_SCHEMA will be used.

SQL_REPRESENTATION_DEFAULT_SCHEMA
skip_fk_constraints bool

whether to skip the generation of foreign key constraints. Defaults to False. FKs can be useful for some external tools, but they may make mutating the database more difficult.

False
skip_views bool

whether to skip the generation of views. Defaults to False.

False
include_meta_tables bool

whether to include the meta-tables. Defaults to True.

True

Returns:

Type Description
SQLRepresentationSchema
Source code in mitm_tooling/representation/sql/sql_representation/mitm_db_schema.py
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
def mk_sql_rep_schema(
    header: Header,
    target_schema: SchemaName | None = SQL_REPRESENTATION_DEFAULT_SCHEMA,
    skip_fk_constraints: bool = False,
    skip_views: bool = False,
    include_meta_tables: bool = True,
) -> SQLRepresentationSchema:
    """
    Generate an `SQLRepresentationSchema` from a `Header`.
    The canonical relational MITM representation requires the inclusion of the meta-tables and views.

    :param header: the header to generate the schema from
    :param target_schema: the name of the schema to create the tables in. By default, the `SQL_REPRESENTATION_DEFAULT_SCHEMA` will be used.
    :param skip_fk_constraints: whether to skip the generation of foreign key constraints. Defaults to False. FKs can be useful for some external tools, but they may make mutating the database more difficult.
    :param skip_views: whether to skip the generation of views. Defaults to False.
    :param include_meta_tables: whether to include the meta-tables. Defaults to True.
    :return:
    """

    mitm_def = get_mitm_def(header.mitm)
    meta = sa.MetaData(schema=target_schema)

    concept_tables: ConceptTablesDict = {}
    type_tables: ConceptTypeTablesDict = {}
    views: dict[str, tuple[sa.Table, ViewProperties]] = {}

    for concept in mitm_def.main_concepts:
        concept_tables[concept] = mk_concept_table(
            meta, header.mitm, concept, target_schema=target_schema, skip_fk_constraints=skip_fk_constraints
        )

    for he in header.header_entries:
        he_concept = he.concept
        if has_type_tables(mitm_def, he_concept):
            t = mk_type_table(meta, header.mitm, he, target_schema, skip_fk_constraints=skip_fk_constraints)

            if he_concept not in type_tables:
                type_tables[he_concept] = {}
            type_tables[he_concept][he.type_name] = t

    if not skip_views:
        views = mk_views(meta, header, concept_tables, type_tables, target_schema=target_schema)

    meta_tables = None
    if include_meta_tables:
        meta_tables = mk_meta_tables(meta, target_schema=target_schema)

    return SQLRepresentationSchema(
        mitm=header.mitm,
        sa_meta=meta,
        meta_tables=meta_tables,
        concept_tables=concept_tables,
        type_tables=type_tables,
        views=views,
    )

mutate_schema(bind: AnyDBBind, gen_current_header: Callable[[], Header], gen_target_header: Callable[[], Header], gen_current_sql_rep_schema: Callable[[Header], SQLRepresentationSchema] = lambda h: mk_sql_rep_schema(h), gen_target_sql_rep_schema: Callable[[Header], SQLRepresentationSchema] = lambda h: mk_sql_rep_schema(h)) -> None

Mutate the header of a mitm db (relational database in canonical mitm format) to a target header. Migrating the derived SQL representation schema entails the following steps:

  1. drop all views that depend on types from the current header
  2. drop all types from the current header
  3. migrate the existing tables to reflect type changes and create new tables for new types
  4. recreate all type-dependent views
  5. update the meta-tables with the new header information

Note that if this function is called with a bind of type Connection (as opposed to a Engine), a manual commit is required after calling it to persist the changes. Internally, all changes are performed within nested transactions.

Parameters:

Name Type Description Default
bind AnyDBBind

a bind to the database to insert into

required
gen_current_header Callable[[], Header]

a factory for creating the current header from which to migrate

required
gen_target_header Callable[[], Header]

a factory for creating the target header to which to migrate

required
gen_current_sql_rep_schema Callable[[Header], SQLRepresentationSchema]

a factory for creating the current SQL representation schema from the current header

lambda h: mk_sql_rep_schema(h)
gen_target_sql_rep_schema Callable[[Header], SQLRepresentationSchema]

a factory for creating the target SQL representation schema from the target header

lambda h: mk_sql_rep_schema(h)

Returns:

Type Description
None
Source code in mitm_tooling/representation/sql/sql_mutation/interface.py
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
def mutate_schema(
    bind: AnyDBBind,
    gen_current_header: Callable[[], Header],
    gen_target_header: Callable[[], Header],
    gen_current_sql_rep_schema: Callable[[Header], SQLRepresentationSchema] = lambda h: mk_sql_rep_schema(h),
    gen_target_sql_rep_schema: Callable[[Header], SQLRepresentationSchema] = lambda h: mk_sql_rep_schema(h),
) -> None:
    """
    Mutate the header of a mitm db (relational database in canonical mitm format) to a target header.
    Migrating the derived SQL representation schema entails the following steps:

    1. drop all views that depend on types from the current header
    2. drop all types from the current header
    3. migrate the existing tables to reflect type changes and create new tables for new types
    4. recreate all type-dependent views
    5. update the meta-tables with the new header information

    Note that if this function is called with a `bind` of type `Connection` (as opposed to a `Engine`),
    a manual commit is required after calling it to persist the changes.
    Internally, all changes are performed within nested transactions.

    :param bind: a bind to the database to insert into
    :param gen_current_header: a factory for creating the current header from which to migrate
    :param gen_target_header: a factory for creating the target header to which to migrate
    :param gen_current_sql_rep_schema: a factory for creating the current SQL representation schema from the current header
    :param gen_target_sql_rep_schema: a factory for creating the target SQL representation schema from the target header
    :return:
    """
    current_header = gen_current_header()
    new_header = gen_target_header()

    current_sql_rep_schema = gen_current_sql_rep_schema(current_header)
    target_sql_rep_schema = gen_target_sql_rep_schema(new_header)

    migrate_schema(bind, current_header, new_header, current_sql_rep_schema, target_sql_rep_schema)

MITM Data Transformation

DataFrame

exportable_to_mitm_dataframes_stream(source: AnyDBBind, exportable: Exportable, stream_data: bool = False) -> MITMDataFrameStream

Apply the Exportable to the source database to generate a structured stream of data frames.

See also: StreamingMITMDataFrames.

Parameters:

Name Type Description Default
source AnyDBBind

a bind to the database to query from

required
exportable Exportable

the Exportable to apply

required
stream_data bool

whether to query the database in chunks or not

False

Returns:

Type Description
MITMDataFrameStream

a MITMDataFrameStream

Source code in mitm_tooling/transformation/df/from_exportable.py
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
def exportable_to_mitm_dataframes_stream(
    source: AnyDBBind,
    exportable: Exportable,
    stream_data: bool = False,
) -> MITMDataFrameStream:
    """
    Apply the `Exportable` to the `source` database to generate a structured stream of data frames.

    See also: `StreamingMITMDataFrames`.

    :param source: a bind to the database to query from
    :param exportable: the `Exportable` to apply
    :param stream_data: whether to query the database in chunks or not
    :return: a `MITMDataFrameStream`
    """

    mitm_def = get_mitm_def(exportable.mitm)
    with use_db_bind(source) as source_conn:
        for c, dps in exportable.data_providers.items():

            def df_chunks_iter(c=c, dps=dps) -> Iterator[tuple[TypeName, Iterable[pd.DataFrame]]]:
                for dp in dps:
                    chunks = (
                        dp.instance_provider.apply_db_chunked(source_conn)
                        if stream_data
                        else [dp.instance_provider.apply_db(source_conn)]
                    )
                    for df_chunk in chunks:
                        df_chunk = dp.instance_postprocessor.apply_df(df_chunk)
                        for type_name, type_idx in df_chunk.groupby(
                            mitm_def.get_properties(c).typing_concept
                        ).groups.items():
                            hes = dp.header_entry_provider.apply_df(df_chunk.loc[type_idx])  # noqa
                            if not len(hes) == 1:
                                raise MITMTypeError(f'Expected exactly one header entry per type, got {len(hes)}.')
                            he = hes[0]
                            typed_df = df_chunk.loc[type_idx].rename(columns=he.attr_name_map)
                            yield str(type_name), (typed_df,)

            yield c, df_chunks_iter(c, dps)

exportable_to_typed_mitm_dataframes_stream(source: AnyDBBind, exportable: Exportable, stream_data: bool = False) -> TypedMITMDataFrameStream

Apply the Exportable to the source database to generate a structured stream of data frames that includes explicit type information.

See also: StreamingMITMDataFrames.

Parameters:

Name Type Description Default
source AnyDBBind

a bind to the database to query from

required
exportable Exportable

the Exportable to apply

required
stream_data bool

whether to query the database in chunks or not

False

Returns:

Type Description
TypedMITMDataFrameStream

a TypedMITMDataFrameStream

Source code in mitm_tooling/transformation/df/from_exportable.py
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
def exportable_to_typed_mitm_dataframes_stream(
    source: AnyDBBind,
    exportable: Exportable,
    stream_data: bool = False,
) -> TypedMITMDataFrameStream:
    """
    Apply the `Exportable` to the `source` database to generate a structured stream of data frames that includes explicit type information.

    See also: `StreamingMITMDataFrames`.

    :param source: a bind to the database to query from
    :param exportable: the `Exportable` to apply
    :param stream_data: whether to query the database in chunks or not
    :return: a `TypedMITMDataFrameStream`
    """

    mitm_def = get_mitm_def(exportable.mitm)
    with use_db_bind(source) as source_conn:
        for c, dps in exportable.data_providers.items():

            def typed_df_chunks_iter(c=c, dps=dps) -> Iterator[tuple[TypeName, HeaderEntry, Iterable[pd.DataFrame]]]:
                for dp in dps:
                    chunks = (
                        dp.instance_provider.apply_db_chunked(source_conn)
                        if stream_data
                        else [dp.instance_provider.apply_db(source_conn)]
                    )
                    for df_chunk in chunks:
                        df_chunk = dp.instance_postprocessor.apply_df(df_chunk)
                        for type_name, type_idx in df_chunk.groupby(
                            mitm_def.get_properties(c).typing_concept
                        ).groups.items():
                            # exactly one header entry per type
                            hes = dp.header_entry_provider.apply_df(df_chunk.loc[type_idx])  # noqa
                            if not len(hes) == 1:
                                raise MITMTypeError(f'Expected exactly one header entry per type, got {len(hes)}.')
                            he = hes[0]
                            # de-anonymize the columns a_i -> actual attribute name
                            typed_df = df_chunk.loc[type_idx].rename(columns=he.attr_name_map)
                            yield str(type_name), he, (typed_df,)

            yield c, typed_df_chunks_iter(c, dps)

mitm_data_into_mitm_dataframes(mitm_data: MITMData) -> MITMDataFrames

Unpack a MITMData object into a MITMDataFrames object.

Source code in mitm_tooling/transformation/df/from_intermediate.py
69
70
71
72
73
74
75
76
77
def mitm_data_into_mitm_dataframes(mitm_data: MITMData) -> MITMDataFrames:
    """
    Unpack a `MITMData` object into a `MITMDataFrames` object.
    """
    mitm_data = mitm_data.as_specialized()
    return MITMDataFrames(
        header=mitm_data.header,
        dfs={concept: unpack_concept_table_as_typed_dfs(mitm_data.header, concept, df) for concept, df in mitm_data},
    )

mitm_dataframes_into_mitm_data(mitm_dataframes: MITMDataFrames) -> MITMData

Convert a MITMDataFrames object into a MITMData object.

Source code in mitm_tooling/transformation/df/into_intermediate.py
67
68
69
70
71
72
73
74
75
76
77
78
def mitm_dataframes_into_mitm_data(mitm_dataframes: MITMDataFrames) -> MITMData:
    """
    Convert a `MITMDataFrames` object into a `MITMData` object.
    """
    return MITMData(
        header=mitm_dataframes.header,
        concept_dfs={
            concept: pack_typed_dfs_as_concept_table(mitm_dataframes.header, concept, typed_dfs)
            for concept, typed_dfs in mitm_dataframes
            if len(typed_dfs) > 0
        },
    ).as_generalized()

streaming_mitm_data_into_typed_mitm_dataframe_stream(streaming_mitm_data: StreamingMITMData) -> TypedMITMDataFrameStream

Incrementally unpack StreamingMITMData into a TypedMITMDataFrameStream.

Source code in mitm_tooling/transformation/df/from_intermediate.py
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
def streaming_mitm_data_into_typed_mitm_dataframe_stream(
    streaming_mitm_data: StreamingMITMData,
) -> TypedMITMDataFrameStream:
    """
    Incrementally unpack `StreamingMITMData` into a `TypedMITMDataFrameStream`.
    """
    mitm = streaming_mitm_data.mitm

    def itr():
        for _, streaming_concept in streaming_mitm_data.data_sources.items():
            for iters in streaming_concept.chunk_iterators:
                for df, hes in iters:
                    if len(hes) > 0:
                        shared_concept = hes[0].concept
                        for he in hes[1:]:
                            if shared_concept != he.concept:
                                raise MITMTypeError('Inhomogeneous header entries')
                        h = Header.of(mitm, *hes)

                        def local_iter(df=df, h=h, shared_concept=shared_concept):
                            type_hes = h.as_dict[shared_concept]
                            unpacked = unpack_concept_table_as_typed_dfs(h, shared_concept, df)
                            for type_name, typed_df in unpacked.items():
                                yield type_name, type_hes[type_name], (typed_df,)

                        yield shared_concept, local_iter()

    return itr()

streaming_mitm_dataframes_into_streaming_mitm_data(streaming_mitm_dataframes: StreamingMITMDataFrames) -> StreamingMITMData

Convert a StreamingMITMDataFrames object into a StreamingMITMData object.

Source code in mitm_tooling/transformation/df/into_intermediate.py
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
def streaming_mitm_dataframes_into_streaming_mitm_data(
    streaming_mitm_dataframes: StreamingMITMDataFrames,
) -> StreamingMITMData:
    """
    Convert a `StreamingMITMDataFrames` object into a `StreamingMITMData` object.
    """
    h = streaming_mitm_dataframes.header
    mitm = h.mitm
    mitm_def = get_mitm_def(mitm)
    as_dict = h.as_dict

    packed = {}
    max_ks = {c: max((he.attr_k for he in t_hes.values()), default=0) for c, t_hes in h.as_generalized_dict.items()}

    for concept, type_dfs in streaming_mitm_dataframes.df_iters.items():
        parent_concept = mitm_def.get_parent(concept)
        if parent_concept not in packed:
            packed[parent_concept] = []

        def local_iter(parent_concept=parent_concept, concept=concept, type_dfs=type_dfs):
            for type_name, dfs in type_dfs.items():
                for df in dfs:
                    chunk = pack_typed_dfs_as_concept_table(
                        h, parent_concept, {type_name: df}, max_attr_k_override=max_ks[parent_concept]
                    )
                    yield chunk, [as_dict[concept][type_name]]

        packed[parent_concept] += local_iter()

    data_sources = {}

    for concept, iters in packed.items():
        structure_df = pd.DataFrame(columns=mk_concept_file_header(mitm, concept, max_ks[concept])[0])

        scd = StreamingConceptData(structure_df=structure_df, chunk_iterators=iters)

        data_sources[concept] = scd

    return StreamingMITMData(
        mitm=mitm,
        data_sources=data_sources,
    )

Relational/SQL

append_data(bind: AnyDBBind, gen_sql_rep_schema: Callable[[], SQLRepresentationSchema], gen_instances: Callable[[], TypedMITMDataFrameStream] = lambda: ()) -> SQLRepInsertionResult

Append a stream of MITM dataframes into a relational database, with tables (and views) defined by the given SQL representation schema. This assumes that the schema is already created. In particular, this implies that the instances to be inserted cannot be of any type not yet present in the schema.

Note that if this function is called with a bind of type Connection (as opposed to a Engine), a manual commit is required after calling it to persist the changes. Internally, all changes are performed within nested transactions.

Parameters:

Name Type Description Default
bind AnyDBBind

a bind to the database to insert into

required
gen_sql_rep_schema Callable[[], SQLRepresentationSchema]

a factory for the SQL representation schema to use

required
gen_instances Callable[[], TypedMITMDataFrameStream]

a factory for a stream of (typed) instances to insert

lambda: ()

Returns:

Type Description
SQLRepInsertionResult

a summary of the inserted instances

Source code in mitm_tooling/representation/sql/sql_mutation/interface.py
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
def append_data(
    bind: AnyDBBind,
    gen_sql_rep_schema: Callable[[], SQLRepresentationSchema],
    gen_instances: Callable[[], TypedMITMDataFrameStream] = lambda: (),
) -> SQLRepInsertionResult:
    """
    Append a stream of MITM dataframes into a relational database, with tables (and views) defined by the given SQL representation schema.
    This assumes that the schema is already created. In particular, this implies that the instances to be inserted cannot be of any type not yet present in the schema.

    Note that if this function is called with a `bind` of type `Connection` (as opposed to a `Engine`),
    a manual commit is required after calling it to persist the changes.
    Internally, all changes are performed within nested transactions.

    :param bind: a bind to the database to insert into
    :param gen_sql_rep_schema: a factory for the SQL representation schema to use
    :param gen_instances: a factory for a stream of (typed) instances to insert
    :return: a summary of the inserted instances
    """

    sql_rep_schema = gen_sql_rep_schema()
    insertion_result = insert_instances(bind, sql_rep_schema, gen_instances())
    return insertion_result

append_exportable(target: AnyDBBind, source: AnyDBBind, exportable: Exportable, target_schema: SchemaName | None = None, stream_data: bool = False) -> SQLRepInsertionResult

Insert instances from the source database into the target database, using the ETL-pipeline defined by the Exportable. In contrast to insert_exportable, this function assumes that the schema is already created. As a first step, the SQLRepresentationSchema is derived from the Header of the target database. Then, the data is queried from the source and is inserted into the target. Depending on the stream_data parameter, the data is incrementally queried and then inserted in chunks or first fully loaded and then inserted.

See also append_data.

Source code in mitm_tooling/transformation/sql/into_sql.py
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
def append_exportable(
    target: AnyDBBind,
    source: AnyDBBind,
    exportable: Exportable,
    target_schema: SchemaName | None = None,
    stream_data: bool = False,
) -> SQLRepInsertionResult:
    """
    Insert instances from the `source` database into the `target` database, using the ETL-pipeline defined by the `Exportable`.
    In contrast to `insert_exportable`, this function assumes that the schema is already created.
    As a first step, the `SQLRepresentationSchema` is derived from the `Header` of the target database.
    Then, the data is queried from the source and is inserted into the target.
    Depending on the `stream_data` parameter, the data is incrementally queried and then inserted in chunks or first fully loaded and then inserted.

    See also `append_data`.
    """

    def sql_rep_schema() -> SQLRepresentationSchema:
        from mitm_tooling.transformation.sql import mitm_db_into_header

        h = mitm_db_into_header(target, override_schema=target_schema)
        return mk_sql_rep_schema(h, target_schema=target_schema)

    def instances() -> TypedMITMDataFrameStream:
        from mitm_tooling.transformation.df import exportable_to_typed_mitm_dataframes_stream

        return exportable_to_typed_mitm_dataframes_stream(source, exportable, stream_data=stream_data)

    return append_data(target, sql_rep_schema, instances)

db_engine_into_db_meta(engine: sa.Engine) -> DBMetaInfo

Introspect an SQLAlchemy engine and return a DBMetaInfo object.

Source code in mitm_tooling/transformation/sql/from_sql.py
 8
 9
10
11
12
13
def db_engine_into_db_meta(engine: sa.Engine) -> DBMetaInfo:
    """
    Introspect an SQLAlchemy engine and return a `DBMetaInfo` object.
    """
    sa_meta, more_meta = sa_reflect(engine)
    return DBMetaInfo.from_sa_meta(sa_meta, default_schema=more_meta.default_schema)

header_into_db_meta(header: Header, override_schema: str | None = None) -> DBMetaInfo

Derive a DBMetaInfo object from a Header by generating a SQLRepresentationSchema and calling sql_rep_schema_into_db_meta.

Source code in mitm_tooling/transformation/sql/from_intermediate.py
 7
 8
 9
10
11
12
13
14
def header_into_db_meta(header: Header, override_schema: str | None = None) -> DBMetaInfo:
    """
    Derive a `DBMetaInfo` object from a `Header` by generating a `SQLRepresentationSchema` and calling `sql_rep_schema_into_db_meta`.
    """
    from .from_sql import sql_rep_schema_into_db_meta

    sql_rep_schema = mk_sql_rep_schema(header, **notna_kwargs(target_schema=override_schema))
    return sql_rep_schema_into_db_meta(sql_rep_schema)

insert_exportable(target: AnyDBBind, source: AnyDBBind, exportable: Exportable, target_schema: SchemaName | None = None, stream_data: bool = False) -> SQLRepInsertionResult

Insert instances from the source database into the target database, using the ETL-pipeline defined by the Exportable. First, the database schema, including tables, as defined by the SQLRepresentationSchema is created on the target database. Then, the data is queried from the source and is inserted into the target. Depending on the stream_data parameter, the data is inserted in batches or all at once.

See also insert_data.

Source code in mitm_tooling/transformation/sql/into_sql.py
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
def insert_exportable(
    target: AnyDBBind,
    source: AnyDBBind,
    exportable: Exportable,
    target_schema: SchemaName | None = None,
    stream_data: bool = False,
) -> SQLRepInsertionResult:
    """
    Insert instances from the `source` database into the `target` database, using the ETL-pipeline defined by the `Exportable`.
    First, the database schema, including tables, as defined by the `SQLRepresentationSchema` is created on the target database.
    Then, the data is queried from the source and is inserted into the target.
    Depending on the `stream_data` parameter, the data is inserted in batches or all at once.

    See also `insert_data`.
    """

    def header() -> Header:
        return exportable.generate_header(source)

    def instances() -> TypedMITMDataFrameStream:
        from mitm_tooling.transformation.df import exportable_to_typed_mitm_dataframes_stream

        return exportable_to_typed_mitm_dataframes_stream(source, exportable, stream_data=stream_data)

    return insert_data(target, header, lambda h: mk_sql_rep_schema(h, target_schema=target_schema), instances)

insert_mitm_data(bind: EngineOrConnection, mitm_data: MITMData, schema_name: SchemaName | None = None) -> SQLRepInsertionResult

Insert MITMData instances into a relational database, using the given SQLRepresentationSchema. The database schema, including tables, is first created via DDL, then the data is inserted via INSERT statements.

See also insert_data.

Source code in mitm_tooling/transformation/sql/into_sql.py
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
def insert_mitm_data(
    bind: EngineOrConnection,
    mitm_data: MITMData,
    schema_name: SchemaName | None = None,
) -> SQLRepInsertionResult:
    """
    Insert `MITMData` instances into a relational database, using the given `SQLRepresentationSchema`.
    The database schema, including tables, is first created via DDL, then the data is inserted via INSERT statements.

    See also `insert_data`.
    """

    def instances() -> TypedMITMDataFrameStream:
        from mitm_tooling.transformation.df import mitm_data_into_mitm_dataframes

        return mitm_data_into_mitm_dataframes(mitm_data).as_streaming().typed_stream()

    return insert_data(
        bind, lambda: mitm_data.header, lambda h: mk_sql_rep_schema(h, target_schema=schema_name), instances
    )

insert_mitm_dataframes(bind: EngineOrConnection, mitm_dataframes: MITMDataFrames, schema_name: SchemaName | None = None) -> SQLRepInsertionResult

Insert MITMDataFrames instances into a relational database, using the given SQLRepresentationSchema. The database schema, including tables, is first created via DDL, then the data is inserted via INSERT statements.

See also insert_data.

Source code in mitm_tooling/transformation/sql/into_sql.py
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
def insert_mitm_dataframes(
    bind: EngineOrConnection,
    mitm_dataframes: MITMDataFrames,
    schema_name: SchemaName | None = None,
) -> SQLRepInsertionResult:
    """
    Insert `MITMDataFrames` instances into a relational database, using the given `SQLRepresentationSchema`.
    The database schema, including tables, is first created via DDL, then the data is inserted via INSERT statements.

    See also `insert_data`.
    """

    def instances() -> TypedMITMDataFrameStream:
        return mitm_dataframes.as_streaming().typed_stream()

    return insert_data(
        bind, lambda: mitm_dataframes.header, lambda h: mk_sql_rep_schema(h, target_schema=schema_name), instances
    )

mitm_data_into_db_meta(mitm_data: MITMData, override_schema: str | None = None) -> DBMetaInfo

Derive a DBMetaInfo object from MITMData by calling header_into_db_meta on the dataset header.

Source code in mitm_tooling/transformation/sql/from_intermediate.py
17
18
19
20
21
def mitm_data_into_db_meta(mitm_data: MITMData, override_schema: str | None = None) -> DBMetaInfo:
    """
    Derive a `DBMetaInfo` object from `MITMData` by calling `header_into_db_meta` on the dataset header.
    """
    return header_into_db_meta(mitm_data.header, override_schema=override_schema)

mitm_db_into_header(bind: AnyDBBind, override_schema: SchemaName | None = None) -> Header | None

Assuming a database with a MITM representation, reads the type information from the meta-tables.

Parameters:

Name Type Description Default
bind AnyDBBind

a bind to a database

required
override_schema SchemaName | None

the name of the schema in which the tables are located

None

Returns:

Type Description
Header | None

the type information, or None if it failed

Source code in mitm_tooling/transformation/sql/into_intermediate.py
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
def mitm_db_into_header(bind: AnyDBBind, override_schema: SchemaName | None = None) -> Header | None:
    """
    Assuming a database with a MITM representation, reads the type information from the meta-tables.

    :param bind: a bind to a database
    :param override_schema: the name of the schema in which the tables are located
    :return: the type information, or None if it failed
    """

    sa_meta = sa.MetaData()
    meta_tables = mk_meta_tables(sa_meta, **notna_kwargs(target_schema=override_schema))
    with use_db_bind(bind) as conn:
        kvs = dict(conn.execute(sa.select(meta_tables.key_value)).all())
        if mitm_str := kvs.get('mitm'):
            mitm: MITM = MITM(mitm_str)
            get_mitm_def(mitm)
            t_left, t_right = meta_tables.types, meta_tables.type_attributes
            j = sa.join(t_left, t_right, isouter=True)

            type_attributes = conn.execute(
                sa.select(
                    *pick_from_mapping(t_left.c, ('kind', 'type', 'concept')),
                    *pick_from_mapping(t_right.c, ('attribute_order', 'attribute_name', 'attribute_dtype')),
                ).select_from(j)
            ).all()
            df = pd.DataFrame.from_records(
                type_attributes,
                columns=['kind', 'type', 'concept', 'attribute_order', 'attribute_name', 'attribute_dtype'],
            )
            hes = []
            for (kind, type_name, concept), idx in df.groupby(['kind', 'type', 'concept']).groups.items():
                attributes_df = (
                    df.loc[idx]
                    .dropna()
                    .sort_values('attribute_order', ascending=True)[['attribute_name', 'attribute_dtype']]
                )
                if len(attributes_df) > 0:
                    attribute_names, attribute_dtypes = zip(*attributes_df.itertuples(index=False), strict=False)
                else:
                    attribute_names, attribute_dtypes = (), ()
                # c = mitm_def.inverse_concept_key_map[kind]
                hes.append(
                    HeaderEntry(
                        concept=concept,
                        kind=kind,
                        type_name=type_name,
                        attributes=tuple(attribute_names),
                        attribute_dtypes=tuple(attribute_dtypes),
                    )
                )

            return Header(mitm=mitm, header_entries=frozenset(hes))
        return None

mk_sqlite(mitm_data: MITMData, file_path: FilePath | None = ':memory:', autoclose: bool = True) -> sa.Engine

Insert mitm_data into a SQLite database. It is created if it does not exist. Uses insert_mitm_data.

Source code in mitm_tooling/transformation/sql/into_sql.py
119
120
121
122
123
124
125
126
127
128
def mk_sqlite(mitm_data: MITMData, file_path: FilePath | None = ':memory:', autoclose: bool = True) -> sa.Engine:
    """
    Insert `mitm_data` into a SQLite database. It is created if it does not exist. Uses `insert_mitm_data`.
    """

    engine = create_sa_engine(AnyUrl(f'sqlite:///{str(file_path)}'), poolclass=sa.StaticPool)
    insert_mitm_data(engine, mitm_data)
    if autoclose:
        engine.dispose()
    return engine

sql_rep_into_exportable(header: Header, sql_rep_schema: SQLRepresentationSchema) -> Exportable

Create an Exportable from a Header by binding the concepts and types to the tables specified in the SQLRepresentationSchema.

Source code in mitm_tooling/transformation/sql/into_exportable.py
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
def sql_rep_into_exportable(header: Header, sql_rep_schema: SQLRepresentationSchema) -> Exportable:
    """
    Create an `Exportable` from a `Header` by binding the concepts and types to the tables specified in the `SQLRepresentationSchema`.
    """

    data_providers: dict[ConceptName, list[DataProvider]] = {}
    mitm_def = header.mitm_def
    for he in header.header_entries:
        props = mitm_def.get_properties(he.concept)

        if has_type_tables(mitm_def, he.concept):
            sa_table = sql_rep_schema.get_type_table(he.concept, he.type_name)
        else:
            sa_table = sql_rep_schema.get_concept_table(he.concept)

        if sa_table is not None:
            tm = TableMetaInfo.from_sa_table(sa_table)
            typing_concept = props.typing_concept
            if he.concept not in data_providers:
                data_providers[he.concept] = []

            data_providers[he.concept].append(
                DataProvider(
                    instance_provider=InstancesProvider(
                        virtual_view=VirtualView(table_meta=tm, from_clause=sa_table, sa_table=sa_table)
                    ),
                    header_entry_provider=HeaderEntryProvider(
                        concept=he.concept,
                        table_meta=tm,
                        kind_provider=ColumnContentProvider.from_static('kind', he.kind),
                        type_provider=ColumnContentProvider.from_static(typing_concept, he.type_name),
                        attributes=list(he.attributes),
                        attribute_dtypes=list(he.attribute_dtypes),
                    ),
                    instance_postprocessor=InstancesPostProcessor(),
                )
            )
        else:
            raise MITMTypeError(f'Type {he.concept}:{he.type_name} is not present in the SQL representation schema.')

    return Exportable(mitm=header.mitm, data_providers=data_providers)

sql_rep_into_mappings(header: Header, sql_rep_schema: SQLRepresentationSchema) -> list[ConceptMapping]

Generate a list of ConceptMappings from a Header and SQLRepresentationSchema. Can be used to create Mapping

Source code in mitm_tooling/transformation/sql/into_mappings.py
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
def sql_rep_into_mappings(header: Header, sql_rep_schema: SQLRepresentationSchema) -> list[ConceptMapping]:
    """
    Generate a list of `ConceptMappings` from a `Header` and `SQLRepresentationSchema`.
    Can be used to create `Mapping`
    """

    mitm_def = header.mitm_def
    cms = []
    for he in header.header_entries:
        concept_properties, relations = mitm_def.get(he.concept)
        base_table = None
        main_concept = mitm_def.get_parent(he.concept)
        if (type_t := sql_rep_schema.get_type_table(he.concept, he.type_name)) is not None:
            base_table = type_t
        elif (concept_t := sql_rep_schema.get_concept_table(main_concept)) is not None:
            base_table = concept_t
        if base_table is not None:
            cms.append(
                ConceptMapping(
                    mitm=header.mitm,
                    concept=he.concept,
                    base_table=(SourceDBType.OriginalDB, base_table.schema, base_table.name),
                    kind_col='kind' if 'kind' in base_table.columns else None,
                    type_col=concept_properties.typing_concept,
                    identity_columns=list(relations.identity.keys()),
                    inline_relations=list(relations.inline.keys()),
                    foreign_relations={
                        fk_name: ForeignRelation(
                            fk_columns=list(fk_info.fk_relations.keys()),
                            referred_table=(SourceDBType.OriginalDB, target_concept_t.schema, target_concept_t.name),
                        )
                        for fk_name, fk_info in relations.foreign.items()
                        if (target_concept_t := sql_rep_schema.concept_tables.get(fk_info.target_concept)) is not None
                    },
                    attributes=list(he.attributes),
                    attribute_dtypes=list(he.attribute_dtypes),
                )
            )

    return cms

sql_rep_schema_into_db_meta(sql_rep_schema: SQLRepresentationSchema, default_schema: str = SQL_REPRESENTATION_DEFAULT_SCHEMA) -> DBMetaInfo

Derive a DBMetaInfo object from an SQLRepresentationSchema.

Source code in mitm_tooling/transformation/sql/from_sql.py
16
17
18
19
20
21
22
def sql_rep_schema_into_db_meta(
    sql_rep_schema: SQLRepresentationSchema, default_schema: str = SQL_REPRESENTATION_DEFAULT_SCHEMA
) -> DBMetaInfo:
    """
    Derive a `DBMetaInfo` object from an `SQLRepresentationSchema`.
    """
    return DBMetaInfo.from_sa_tables(default_schema, *sql_rep_schema.tables_list)

Superset Asset Definitions and Generation

MITM Data Extraction from Relational Databases

DB Introspection Models

Representations for metadata of relational databases.

DBMetaInfo

Bases: DBMetaInfoBase

This model represents the metadata of a relational database via a structured collection of table metadata. It extends the base model with additional DB metadata, in particular a SQLAlchemy MetaData object. It is therefore not serializable.

It can be derived from SQLAlchemy metadata such as a list of Tables or a MetaData object.

Source code in mitm_tooling/extraction/relational/data_models/db_meta.py
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
class DBMetaInfo(DBMetaInfoBase):
    """
    This model represents the metadata of a relational database via a structured collection of table metadata.
    It extends the base model with additional DB metadata, in particular a SQLAlchemy `MetaData` object.
    It is therefore not serializable.

    It can be derived from SQLAlchemy metadata such as a list of `Tables` or a `MetaData` object.
    """

    model_config = pydantic.ConfigDict(arbitrary_types_allowed=True)

    db_structure: dict[SchemaName, dict[TableName, TableMetaInfo]]
    default_schema: SchemaName
    sa_meta: MetaData

    @cached_property
    def tables(self) -> dict[ShortTableIdentifier, TableMetaInfo]:
        return {
            tm.short_table_identifier: tm
            for schema, tables in self.db_structure.items()
            for table, tm in tables.items()
        }

    @classmethod
    def from_sa_tables(cls, default_schema: SchemaName, *tables: Table) -> Self:
        db_structure = {}
        meta = sa.MetaData()
        for t in tables:
            meta._add_table(t.name, t.schema, t)  # TODO not so clean
            tm = TableMetaInfo.from_sa_table(t, default_schema=default_schema)
            schema = tm.schema_name
            if schema not in db_structure:
                db_structure[schema] = {}
            db_structure[schema][tm.name] = tm

        return cls(db_structure=db_structure, default_schema=default_schema, sa_meta=meta)

    @classmethod
    def from_sa_meta(cls, meta: MetaData, default_schema: SchemaName) -> Self:
        # note that the connection to the original metadata object is lost in the latest change
        return cls.from_sa_tables(default_schema, *meta.tables.values())

    def search_table(self, schema: SchemaName, table: TableName) -> TableMetaInfo | None:
        return self.db_structure.get(schema, {}).get(table, None)

    def filter_shallow(
        self,
        table_selection: ExplicitTableSelection | None = None,
        column_selection: ExplicitColumnSelection | None = None,
    ) -> Self:
        if table_selection is None:
            return self.model_copy(deep=True)
        else:
            return self.__class__(
                db_structure={
                    schema: {
                        table_name: tm.filter_shallow(ExplicitSelectionUtils.relevant_col_set(tm, column_selection))
                        for table_name, tm in tables.items()
                        if ExplicitSelectionUtils.table_survives(tm, table_selection)
                    }
                    for schema, tables in self.db_structure.items()
                },
                default_schema=self.default_schema,
                sa_meta=self.sa_meta,
            )

    def filter(
        self,
        table_selection: ExplicitTableSelection | None = None,
        column_selection: ExplicitColumnSelection | None = None,
    ) -> Self:
        if table_selection is None:
            return self.model_copy(deep=True)
        else:
            meta = MetaData()
            filtered_tms = {
                schema: {
                    table_name: tm.filter(self, meta, table_selection, column_selection)
                    for table_name, tm in tables.items()
                    if ExplicitSelectionUtils.table_survives(tm, table_selection)
                }
                for schema, tables in self.db_structure.items()
            }
            new_dbm = self.__class__(
                db_structure={
                    schema: {table_name: tm for table_name, (tm, _) in filter_results.items()}
                    for schema, filter_results in filtered_tms.items()
                },
                default_schema=self.default_schema,
                sa_meta=meta,
            )
            for filter_results in filtered_tms.values():
                for _, fixme in filter_results.values():
                    if fixme:
                        fixme(new_dbm)
            return new_dbm

DBProbe

Bases: DBProbeBase

This model represents a probe of a relational database, via a structured collection of table probes. It additionally holds structural information in the form of a full DBMetaInfo object. It is therefore not serializable.

Source code in mitm_tooling/extraction/relational/data_models/db_probe.py
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
class DBProbe(DBProbeBase):
    """
    This model represents a probe of a relational database, via a structured collection of table probes.
    It additionally holds structural information in the form of a full `DBMetaInfo` object.
    It is therefore not serializable.
    """

    db_meta: DBMetaInfo
    db_table_probes: dict[SchemaName, dict[TableName, TableProbe]] = Field(default_factory=dict)

    @property
    def table_probes(self) -> dict[ShortTableIdentifier, TableProbe]:
        return {
            (schema_name, table_name): tp
            for schema_name, schema_probes in self.db_table_probes.items()
            for table_name, tp in schema_probes.items()
        }

    def update_meta(self, new_db_meta: DBMetaInfo):
        if self.db_meta is not None:
            remaining_probes = {}
            for schema_name, existing_tables in self.db_meta.db_structure.items():
                if schema_name not in self.db_table_probes:
                    continue

                if (new_tables := new_db_meta.db_structure.get(schema_name)) is not None:
                    schema_local_probes = {}
                    for table_name, existing_table in existing_tables.items():
                        if table_name not in self.db_table_probes[schema_name]:
                            continue

                        if ((table := new_tables.get(table_name)) is not None) and table == existing_table:
                            schema_local_probes[table_name] = self.db_table_probes[schema_name][table_name]
                        else:
                            logger.info(f'Removed table probe of {schema_name}.{table_name} due to metadata refresh.')

                    if len(schema_local_probes) > 0:
                        remaining_probes[schema_name] = schema_local_probes

            self.db_table_probes = remaining_probes
        self.db_meta = new_db_meta

    def update_probes(self, *probes: tuple[ShortTableIdentifier, TableProbe]):
        for ti, tp in probes:
            schema_name, table_name = ti
            if schema_name not in self.db_table_probes:
                self.db_table_probes[schema_name] = {}
            self.db_table_probes[schema_name][table_name] = tp

    def drop_probes(self, *to_drop: ShortTableIdentifier):
        for ti in to_drop:
            schema_name, table_name = ti
            if schema_name in self.db_table_probes:
                if table_name in self.db_table_probes[schema_name]:
                    del self.db_table_probes[schema_name][table_name]

TableMetaInfo

Bases: TableMetaInfoBase

This model represents the metadata of a table in a relational database. It extends the base model with additional information about the source of the table, in particular a SQLAlchemy Table object and a Queryable. It is therefore not serializable.

Source code in mitm_tooling/extraction/relational/data_models/db_meta.py
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
class TableMetaInfo(TableMetaInfoBase):
    """
    This model represents the metadata of a table in a relational database.
    It extends the base model with additional information about the source of the table, in particular a SQLAlchemy `Table` object and a `Queryable`.
    It is therefore not serializable.
    """

    model_config = pydantic.ConfigDict(arbitrary_types_allowed=True)

    foreign_key_constraints: list[ForeignKeyConstraint] = Field(default_factory=list)
    sa_table: Table
    queryable_source: Queryable

    @classmethod
    def from_sa_table(
        cls, t: Table, queryable_source: Queryable | None = None, default_schema: str | None = None
    ) -> Self:
        fkcs = [
            ForeignKeyConstraint.from_sa_constraint(fkc, t.schema or default_schema)
            for fkc in t.foreign_key_constraints
        ]
        col_props = {
            c.name: ColumnProperties(
                nullable=c.nullable,
                unique=bool(c.unique),
                part_of_index=any(c.name in ind.columns for ind in t.indexes),
                part_of_pk=c.primary_key,
                part_of_fk=len(c.foreign_keys) > 0,
                mitm_data_type=sa_sql_to_mitm_type(c.type),
            )
            for c in t.columns
        }
        return cls(
            name=t.name,
            columns=[c.name for c in t.columns],
            sql_column_types=[str(c.type) for c in t.columns],
            primary_key=[c.name for c in t.primary_key] if t.primary_key else None,
            indexes=[list(ind.columns.keys()) for ind in t.indexes],
            foreign_key_constraints=fkcs,
            schema_name=t.schema or default_schema,
            column_properties=col_props,
            sa_table=t,
            queryable_source=queryable_source if queryable_source is not None else t,
        )

    def filter_shallow(self, column_selection: set[ColumnName] | None = None) -> Self:
        if column_selection is None:
            return self.model_copy(deep=True)
        else:
            cols = [c for c in self.columns if c in column_selection]
            sql_column_types = [str(self.sa_table.columns[c].type) for c in cols]
            primary_key = self.primary_key if all(c in column_selection for c in self.primary_key) else None
            indexes = [ind for ind in self.indexes if all(c in column_selection for c in ind)]
            fkcs = [fkc for fkc in self.foreign_key_constraints if all(c in column_selection for c in fkc.columns)]
            col_props = {c: props for c, props in self.column_properties.items() if c in column_selection}
            return self.__class__(
                name=self.name,
                schema_name=self.schema_name,
                sa_table=self.sa_table,
                columns=cols,
                sql_column_types=sql_column_types,
                primary_key=primary_key,
                indexes=indexes,
                foreign_key_constraints=fkcs,
                column_properties=col_props,
                queryable_source=self.queryable_source if self.queryable_source else self.sa_table,
            )

    def filter(
        self,
        dbm: DBMetaInfo,
        meta: MetaData,
        table_selection: ExplicitTableSelection | None = None,
        column_selection: ExplicitColumnSelection | None = None,
    ) -> Self | tuple[Self, Callable[[DBMetaInfo], ...]]:
        tm = self.filter_shallow(column_selection=ExplicitSelectionUtils.relevant_col_set(self, column_selection))

        valid_fks = [
            fkc for fkc in tm.foreign_key_constraints if fkc.is_still_valid(dbm, tm, table_selection, column_selection)
        ]

        new_sa = sa.Table(
            self.name, meta, *(sa.Column(n, tm.sa_table.c[n].type) for n in tm.columns), schema=self.schema_name
        )

        new_tm = self.from_sa_table(new_sa, self.queryable_source, default_schema=dbm.default_schema)

        fixme = None
        if valid_fks:

            def fixme(new_dbm):
                new_tm.sa_table.foreign_key_constraints.update(
                    sa_fkc for fkc in valid_fks if (sa_fkc := fkc.to_sa_constraint(new_dbm)) is not None
                )

        return new_tm, fixme

VirtualDB

Bases: VirtualDBBase

This model represents a virtual database via a structured collection of virtual views. As it contains SQLAlchemy objects, it is not serializable.

It can be mutated by adding and removing virtual views. Given a specific SQL dialect, it can also be compiled into a serializable CompiledVirtualDB, which in turn can be used as a VirtualDBCreation in a StandaloneDBMapping.

Source code in mitm_tooling/extraction/relational/data_models/virtual_view.py
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
class VirtualDB(VirtualDBBase):
    """
    This model represents a virtual database via a structured collection of virtual views.
    As it contains SQLAlchemy objects, it is not serializable.

    It can be mutated by adding and removing virtual views.
    Given a specific SQL dialect, it can also be compiled into a serializable `CompiledVirtualDB`, which in turn can be used as a `VirtualDBCreation` in a `StandaloneDBMapping`.
    """

    model_config = pydantic.ConfigDict(arbitrary_types_allowed=True)

    virtual_views: dict[SchemaName, dict[TableName, VirtualView]] = Field(default_factory=dict)
    sa_meta: sa.MetaData = Field(default_factory=lambda: sa.MetaData(schema='virtual'))

    @pydantic.computed_field(repr=False)
    @property
    def views(self) -> dict[ShortTableIdentifier, VirtualView]:
        return {
            vv.table_meta.short_table_identifier: vv
            for schema, views in self.virtual_views.items()
            for view, vv in views.items()
        }

    def put_view(self, vv: VirtualView):
        schema = vv.table_meta.schema_name
        if schema not in self.virtual_views:
            self.virtual_views[schema] = {}
        self.virtual_views[schema][vv.table_meta.name] = vv

    def get_view(self, schema: SchemaName, view: TableName) -> VirtualView | None:
        return self.virtual_views.get(schema, {}).get(view, None)

    def drop_view(self, schema: SchemaName, view: TableName):
        tm = self.get_view(schema, view)
        if tm is not None:
            self.sa_meta.remove(tm.sa_table)
            del self.virtual_views[schema][view]
            if len(self.virtual_views[schema]) == 0:
                del self.virtual_views[schema]

    def update_views(self) -> None:
        for vv in self.views.values():
            vv.update_table_meta()

    def to_db_meta_info(self) -> DBMetaInfo:
        return DBMetaInfo(
            db_structure={
                schema: {view: vv.table_meta for view, vv in views.items()}
                for schema, views in self.virtual_views.items()
            },
            sa_meta=self.sa_meta,
            default_schema=VIRTUAL_DB_DEFAULT_SCHEMA,
        )

    def as_compiled(self, dialect: sa.Dialect) -> CompiledVirtualDB:
        from .compiled import CompiledVirtualDB

        return CompiledVirtualDB(compiled_virtual_views=[vv.as_compiled(dialect) for vv in self.views.values()])

VirtualView

Bases: VirtualViewBase

This model represents a virtual view in a relational database, i.e., a selectable SQL query in combination with its column metadata. As it contains SQLAlchemy objects, it is not serializable.

Given a specific SQL dialect, it can also be compiled into a serializable CompiledVirtualView.

Source code in mitm_tooling/extraction/relational/data_models/virtual_view.py
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
class VirtualView(VirtualViewBase):
    """
    This model represents a virtual view in a relational database, i.e., a selectable SQL query in combination with its column metadata.
    As it contains SQLAlchemy objects, it is not serializable.

    Given a specific SQL dialect, it can also be compiled into a serializable `CompiledVirtualView`.
    """

    model_config = pydantic.ConfigDict(arbitrary_types_allowed=True)

    table_meta: TableMetaInfo
    from_clause: sa.FromClause
    sa_table: sa.Table

    @classmethod
    def from_from_clause(
        cls,
        name: str,
        from_clause: sa.FromClause,
        meta: sa.MetaData,
        schema: SchemaName = 'virtual',
        delete_if_exists: bool = True,
    ) -> Self | None:
        cols = [sa.Column(n, c.type, primary_key=c.primary_key) for n, c in from_clause.columns.items()]

        if (t := meta.tables.get(qualify(schema=schema, table=name), None)) is not None:
            if delete_if_exists:
                meta.remove(t)
            else:
                return None

        virtual_table = sa.Table(name, meta, *cols, schema=schema)
        tm = TableMetaInfo.from_sa_table(virtual_table, queryable_source=from_clause, default_schema=schema)
        return cls(table_meta=tm, from_clause=from_clause, sa_table=virtual_table)

    def update_table_meta(self) -> None:
        self.table_meta = TableMetaInfo.from_sa_table(
            self.sa_table, queryable_source=self.from_clause, default_schema=self.sa_table.schema
        )

    def as_compiled(self, dialect: sa.Dialect) -> CompiledVirtualView:
        from .compiled import CompiledVirtualView, TypedRawQuery

        compiled_sql = TypedRawQuery.compile_from_clause(self.from_clause, dialect)
        tm = self.table_meta
        return CompiledVirtualView(
            name=tm.name,
            schema_name=tm.schema_name,
            dialect=dialect.name,
            compiled_sql=compiled_sql,
            columns=tuple(tm.columns),
            column_dtypes=tuple(tm.sql_column_types),
        )

DB Introspection

Supporting functionality for inferring meta-data of relational databases.

SQL Transformations for ETL-Pipelines

Representations and supporting functionality for transformations on relational databases.

Mapping SQL to MITM Concepts

Representations and supporting functionality for mappings of relational data to MITM concepts.

BoundExportable

Bases: BaseModel

An Exportable that is bound to a specific database connection.

Source code in mitm_tooling/extraction/relational/mapping/export.py
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
class BoundExportable(pydantic.BaseModel):
    """
    An `Exportable` that is bound to a specific database connection.
    """

    model_config = ConfigDict(arbitrary_types_allowed=True)

    bind: AnyDBBind
    exportable: Exportable

    def generate_header(self) -> Header:
        return self.exportable.generate_header(self.bind)

    def generate_mitm_data(self) -> MITMData:
        return self.exportable.generate_mitm_data(self.bind)

    def generate_streaming_mitm_data(self, streaming_chunk_size: int = STREAMING_CHUNK_SIZE) -> StreamingMITMData:
        return self.exportable.generate_streaming_mitm_data(self.bind, streaming_chunk_size=streaming_chunk_size)

ConceptMapping

Bases: StaticValidableGroup, BaseModel

This model represents a mapping of a database table to a MITM concept. Specifically, it records how the columns in the table correspond to the ones required by the concept in the specified MITM.

Given database schema information, it can be validated and produce a SQL selectable source of instances of the concept (in the format of the intermediate representation).

Source code in mitm_tooling/extraction/relational/mapping/concept_mapping.py
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
class ConceptMapping(StaticValidableGroup, pydantic.BaseModel):
    """
    This model represents a mapping of a database table to a MITM concept.
    Specifically, it records how the columns in the table correspond to the ones required by the concept in the specified MITM.

    Given database schema information, it can be validated and produce a SQL selectable source of instances of the concept (in the format of the intermediate representation).
    """

    mitm: MITM
    concept: ConceptName

    base_table: AnyTableIdentifier
    kind_col: ColumnName | None = None
    type_col: ColumnName

    identity_columns: dict[RelationName, ColumnName] | list[ColumnName] = Field(default_factory=dict)
    inline_relations: dict[RelationName, ColumnName] | list[ColumnName] = Field(default_factory=dict)
    foreign_relations: dict[RelationName, ForeignRelation] = Field(default_factory=dict)

    attributes: list[ColumnName] = Field(default_factory=list)
    attribute_dtypes: list[MITMDataType] = Field(default_factory=list)

    @cached_property
    def identity_provider(self) -> IdentityProvider:
        return IdentityProvider(identity_columns=normalize_into_dict(self.identity_columns))

    @cached_property
    def inline_relations_provider(self) -> InlineRelationsProvider:
        return InlineRelationsProvider(inline_relations=normalize_into_dict(self.inline_relations))

    @cached_property
    def foreign_relations_provider(self) -> ForeignRelationsProvider:
        return ForeignRelationsProvider(foreign_relations=self.foreign_relations)

    def get_k(self) -> int:
        return len(self.attribute_dtypes)

    def validate_static(self, context: MappingGroupValidationContext) -> None:
        ctxt = context.derive_individual(
            claimed_concept=self.concept, examined_table=TableIdentifier.from_any(self.base_table)
        )
        vr = ctxt.vr
        table_meta = ctxt.examined_table_meta
        if table_meta is None:
            vr.failed('table not present in db schema')
            return

        vr.include_check(not ctxt.is_base_concept, 'base concepts cannot be mapped explicitly')
        if ctxt.is_abstract_concept:
            vr.include_check(self.kind_col is not None, 'abstract concepts need a column defining their kind')
        if self.kind_col is not None:
            vr.include_check(
                self.kind_col in table_meta.col_set, f'specified kind column ({self.kind_col}) not present in table'
            )

        vr.include_check(
            self.type_col in table_meta.col_set, f'specified type column ({self.type_col}) not present in table'
        )

        self.identity_provider.validate_static(ctxt)
        self.inline_relations_provider.validate_static(ctxt)
        self.foreign_relations_provider.validate_static(ctxt)

        vr.include_check(
            ctxt.relevant_properties.permit_attributes or len(self.attributes) == 0,
            'this concept does not permit attributes',
        )
        vr.include_check(
            len(self.attributes) == len(self.attribute_dtypes),
            'number of declared attribute data types not equal to attributes',
        )
        vr.include_check(set(self.attributes) <= table_meta.col_set, 'attribute columns not present in table')

        # TODO validate data types

        context.include_individual(ctxt)

    def apply(self, db_metas: dict[SourceDBType, DBMetaInfo]) -> tuple[HeaderEntryProvider, Queryable]:
        mitm_def = get_mitm_def(self.mitm)
        concept_properties = mitm_def.get_properties(self.concept)

        table_meta = TableIdentifier.resolve_id(self.base_table, db_metas)
        if table_meta is None:
            raise TableNotFoundException(
                f'Base table {self.base_table} not found for export request: {self.mitm}:{self.concept}'
            )

        base_queryable = table_meta.queryable_source

        def make_type_col() -> tuple[str, sa.ColumnElement]:
            return concept_properties.typing_concept, sa.label(
                concept_properties.typing_concept, col_by_name(base_queryable, self.type_col, raise_on_missing=True)
            )

        def make_kind_col() -> tuple[str, sa.ColumnElement]:
            ce = None
            if self.kind_col is not None:
                ce = sa.label('kind', col_by_name(base_queryable, self.kind_col, raise_on_missing=True))
            else:
                ce = sa.literal_column(f'"{concept_properties.key}"', MITMDataType.Text.sa_sql_type).label('kind')
            return 'kind', ce

        def make_identity_cols() -> list[tuple[str, sa.ColumnElement]]:
            return [
                (q, sa.label(q, col_by_name(base_queryable, col, raise_on_missing=True)))
                for q, col in self.identity_provider.identity_columns.items()
            ]

        def make_inline_relation_cols() -> list[tuple[str, sa.ColumnElement]]:
            return [
                (q, sa.label(q, col_by_name(base_queryable, col, raise_on_missing=True)))
                for q, col in self.inline_relations_provider.inline_relations.items()
            ]

        def make_foreign_relation_cols() -> list[tuple[str, sa.ColumnElement]]:
            foreign_relation_cols = []
            for fk_rel in self.foreign_relations_provider.foreign_relations.values():
                for q, col in fk_rel.fk_columns_.items():
                    foreign_relation_cols.append(
                        (q, sa.label(q, col_by_name(base_queryable, col, raise_on_missing=True)))
                    )
            return foreign_relation_cols

        def make_attribute_cols() -> list[tuple[str, sa.ColumnElement]]:
            return [
                (f'a_{i}', sa.label(f'a_{i}', col_by_name(base_queryable, a, raise_on_missing=True)))
                for i, a in enumerate(self.attributes, 1)
            ]

        selected_columns, created_columns = map_col_groups(
            mitm_def,
            self.concept,
            {
                'kind': make_kind_col,
                'type': make_type_col,
                'identity': make_identity_cols,
                'inline': make_inline_relation_cols,
                'foreign': make_foreign_relation_cols,
                'attributes': make_attribute_cols,
            },
        )

        queryable = sa.select(*selected_columns).select_from(base_queryable).subquery()

        header_entry_provider = HeaderEntryProvider(
            concept=self.concept,
            table_meta=table_meta,
            kind_provider=ColumnContentProvider.from_tuple(
                make_kind_col(), 'kind' in created_columns, concept_properties.key
            ),
            type_provider=ColumnContentProvider.from_tuple(
                make_type_col(), concept_properties.typing_concept in created_columns
            ),
            attributes=self.attributes,
            attribute_dtypes=self.attribute_dtypes,
        )
        return header_entry_provider, queryable

DBMapping

Bases: BaseModel

This model bundles a collection of ConceptMappings of individual tables.

Source code in mitm_tooling/extraction/relational/mapping/db_mapping.py
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
class DBMapping(BaseModel):
    """
    This model bundles a collection of `ConceptMappings` of individual tables.
    """

    model_config = ConfigDict(arbitrary_types_allowed=True)

    mitm: MITM
    concept_mappings: list[ConceptMapping]

    @model_validator(mode='after')
    def post_val(self) -> Self:
        if any(cm.mitm != self.mitm for cm in self.concept_mappings):
            raise ValidationError('All mappings must belong to the same MitM')
        return self

DataProvider

Bases: BaseModel

This model represents a data provider for instances and types of a MITM concept.

Source code in mitm_tooling/extraction/relational/mapping/concept_mapping.py
132
133
134
135
136
137
138
139
140
141
class DataProvider(pydantic.BaseModel):
    """
    This model represents a data provider for instances and types of a MITM concept.
    """

    model_config = pydantic.ConfigDict(arbitrary_types_allowed=True)

    instance_provider: InstancesProvider
    instance_postprocessor: InstancesPostProcessor
    header_entry_provider: HeaderEntryProvider

Exportable

Bases: BaseModel

This model represents an ETL export of (mapped) MITM data from a relational database. The data_providers attribute is a dictionary mapping concept names to lists of DataProviders. A DataProvider represents a source of instances for a specific concept in the form a VirtualView, a post-processing pipeline, and a HeaderEntryProvider.

The Exportable is not bound to a specific database connection but rather represents an ETL pipeline from a relational DB to MITM Data. By providing a bind to a database, it can be used to generate MITMData and StreamingMITMData, or be exported to a zip file. (also in a fully streamed fashion) The header data can also be generated without necessarily querying all the instances. In contrast to StreamingMITMData, the Exportable is expected to be reusable, i.e., its data sources are not read-once.

Note: This model is not serializable as it contains VirtualViews which contain SQLAlchemy objects. Essentially, it is indirectly bound to specific database metadata. Consider StandaloneDBMapping for a serializable representation that can be turned into an Exportable.

Source code in mitm_tooling/extraction/relational/mapping/export.py
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
class Exportable(pydantic.BaseModel):
    """
    This model represents an ETL export of (mapped) MITM data from a relational database.
    The `data_providers` attribute is a dictionary mapping concept names to lists of `DataProviders`.
    A `DataProvider` represents a source of instances for a specific concept in the form a `VirtualView`, a post-processing pipeline, and a `HeaderEntryProvider`.

    The `Exportable` is not bound to a specific database connection but rather represents an ETL pipeline from a relational DB to MITM Data.
    By providing a bind to a database, it can be used to generate `MITMData` and `StreamingMITMData`, or be exported to a zip file. (also in a fully streamed fashion)
    The header data can also be generated without necessarily querying all the instances.
    In contrast to `StreamingMITMData`, the `Exportable` is expected to be reusable, i.e., its data sources are not read-once.

    Note:
        This model is not serializable as it contains `VirtualViews` which contain SQLAlchemy objects.
        Essentially, it is indirectly bound to specific database metadata.
        Consider `StandaloneDBMapping` for a serializable representation that can be turned into an `Exportable`.
    """

    model_config = ConfigDict(arbitrary_types_allowed=True)

    mitm: MITM
    data_providers: dict[ConceptName, list[DataProvider]]
    filename: str | None = None

    @property
    def generalized_data_providers(self) -> dict[ConceptName, list[DataProvider]]:
        mitm_def = get_mitm_def(self.mitm)

        temp = {}
        for c, dps in self.data_providers.items():
            main_concept = mitm_def.get_parent(c)
            if main_concept not in temp:
                temp[main_concept] = []
            temp[main_concept].extend(dps)

        return temp

    def generate_header(self, bind: AnyDBBind) -> Header:
        header_entries = []
        for _, dps in self.data_providers.items():
            for dp in dps:
                header_entries.extend(dp.header_entry_provider.apply_db(bind))
        return Header(mitm=self.mitm, header_entries=frozenset(header_entries))

    def generate_mitm_data(self, bind: AnyDBBind) -> MITMData:
        header_entries = []

        tables = {}
        for c, dps in self.generalized_data_providers.items():
            dfs = []
            for dp in dps:
                df = dp.instance_provider.apply_db(bind)
                df = dp.instance_postprocessor.apply_df(df)
                dfs.append(df)
                header_entries += dp.header_entry_provider.apply_df(df)

            tables[c] = pd.concat(dfs, axis='index', ignore_index=True)

        header = Header(mitm=self.mitm, header_entries=frozenset(header_entries))

        return MITMData(header=header, concept_dfs=tables)

    def generate_streaming_mitm_data(
        self, bind: AnyDBBind, streaming_chunk_size: int = STREAMING_CHUNK_SIZE
    ) -> StreamingMITMData:
        data_sources = {}

        for main_concept, dps in self.generalized_data_providers.items():
            k = max(dp.header_entry_provider.type_arity for dp in dps)
            concept_file_columns = mk_concept_file_header(self.mitm, main_concept, k)[0]
            structure_df = pd.DataFrame(columns=concept_file_columns)

            chunk_iterators = []
            for dp in dps:

                def local_iter(
                    dp: DataProvider = dp, columns=tuple(concept_file_columns)
                ) -> Iterator[tuple[pd.DataFrame, list[HeaderEntry]]]:
                    for df_chunk in dp.instance_provider.apply_db_chunked(bind, streaming_chunk_size):
                        df_chunk = dp.instance_postprocessor.apply_df(df_chunk)
                        hes = dp.header_entry_provider.apply_df(df_chunk)
                        # this does nothing more than adding NaN columns to fill up to the number of attributes in the concept file (k)
                        df_chunk = df_chunk.reindex(columns=list(columns), copy=False)
                        yield df_chunk, hes

                chunk_iterators.append(local_iter())

            data_sources[main_concept] = StreamingConceptData(
                structure_df=structure_df, chunk_iterators=chunk_iterators
            )

        return StreamingMITMData(mitm=self.mitm, data_sources=data_sources)

    def bind(self, bind: AnyDBBind) -> BoundExportable:
        return BoundExportable(bind=bind, exportable=self)

HeaderEntry

Bases: BaseModel

This (immutable) model represents a single entry in a Header, i.e., a type definition.

Source code in mitm_tooling/representation/intermediate/header.py
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
class HeaderEntry(pydantic.BaseModel):
    """
    This (immutable) model represents a single entry in a `Header`, i.e., a type definition.
    """

    model_config = ConfigDict(frozen=True)

    concept: ConceptName
    kind: str
    type_name: TypeName
    attributes: tuple[ColumnName, ...]
    attribute_dtypes: tuple[MITMDataType, ...]

    @pydantic.model_validator(mode='after')
    def attr_check(self):
        if not len(self.attributes) == len(self.attribute_dtypes):
            raise MITMSyntacticError('Length of specified attributes and their data types differs.')
        return self

    @classmethod
    def of(
        cls,
        mitm: MITM,
        concept: ConceptName,
        type_name: TypeName,
        *attrs: tuple[ColumnName, MITMDataType],
    ) -> Self:
        if (props := get_mitm_def(mitm).get_properties(concept)) is not None:
            attributes, attribute_dtypes = zip(*attrs, strict=False) if attrs else ([], [])
            return cls(
                concept=concept,
                kind=props.key,
                type_name=type_name,
                attributes=tuple(attributes),
                attribute_dtypes=tuple(attribute_dtypes),
            )
        else:
            raise MITMSyntacticError(f'Concept {concept} does not exist in MITM definition {mitm}.')

    @classmethod
    def from_row(cls, row: Sequence[str], mitm: MITM) -> Self:
        kind, type_name = row[0], row[1]
        concept = get_mitm_def(mitm).inverse_concept_key_map.get(kind)
        if not concept:
            raise MITMTypeError(f'Encountered unknown concept key: "{kind}".')

        attrs, attr_dts = [], []
        for a, a_dt in zip(row[slice(2, None, 2)], row[slice(3, None, 2)], strict=False):
            if pd.notna(a) and pd.notna(a_dt):
                attrs.append(a)
                try:
                    mitm_dt = MITMDataType(a_dt.lower()) if a_dt else MITMDataType.Unknown
                    attr_dts.append(mitm_dt)
                except ValueError as e:
                    raise MITMTypeError(f'Encountered unrecognized data type during header import: {a_dt}.') from e

        return HeaderEntry(
            concept=concept, kind=kind, type_name=type_name, attributes=tuple(attrs), attribute_dtypes=tuple(attr_dts)
        )

    def iter_attr_dtype_pairs(self) -> Iterable[tuple[TypeName, MITMDataType]]:
        return zip(self.attributes, self.attribute_dtypes, strict=False)

    @cached_property
    def attr_k(self) -> int:
        return len(self.attributes)

    def to_row(self) -> list[str | None]:
        return [self.kind, self.type_name] + list(
            itertools.chain(*zip(self.attributes, map(str, self.attribute_dtypes), strict=False))
        )

    @cached_property
    def attr_name_map(self) -> dict[ColumnName, ColumnName]:
        return {a_anon: a for a_anon, a in zip(mk_attr_columns(self.attr_k), self.attributes, strict=False)}

MappingExport

Bases: BaseModel

This model represents a mapping of relational data to MITM data, including optional post processing. It is used as an intermediate representation for an ETL pipeline that is yet to be bound to specific database metadata.

Source code in mitm_tooling/extraction/relational/mapping/export.py
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
class MappingExport(pydantic.BaseModel):
    """
    This model represents a mapping of relational data to MITM data, including optional post processing.
    It is used as an intermediate representation for an ETL pipeline that is yet to be bound to specific database metadata.
    """

    mitm: MITM
    concept_mappings: list[ConceptMapping]
    post_processing: PostProcessing | None = None
    filename: str | None = None

    def apply(self, db_metas: dict[SourceDBType, DBMetaInfo]) -> Exportable:
        data_providers: dict[ConceptName, list[DataProvider]] = {}

        meta = sa.MetaData(schema='export')
        for i, concept_mapping in enumerate(self.concept_mappings):
            if concept_mapping.mitm != self.mitm:
                continue

            try:
                header_entry_provider, q = concept_mapping.apply(db_metas)
            except TableNotFoundException as e:
                raise ConceptMappingException('Concept Mapping failed.') from e

            # mitm_def = get_mitm_def(self.mitm)
            # main_concept = mitm_def.get_parent(concept_mapping.concept)
            concept = concept_mapping.concept

            vv = VirtualView.from_from_clause(f'{concept}_{i}', q, meta, schema='export')
            instances_provider = InstancesProvider(virtual_view=vv)

            pp_transforms = []
            if self.post_processing is not None:
                pp_transforms = list(
                    itertools.chain(
                        tpp.transforms
                        for tpp in self.post_processing.table_postprocessing
                        if TableIdentifier.check_equal(tpp.target_table, concept_mapping.base_table)
                    )
                )
            post_processor = InstancesPostProcessor(transforms=pp_transforms)

            if concept not in data_providers:
                data_providers[concept] = []
            data_providers[concept].append(
                DataProvider(
                    instance_provider=instances_provider,
                    instance_postprocessor=post_processor,
                    header_entry_provider=header_entry_provider,
                )
            )

        return Exportable(mitm=self.mitm, data_providers=data_providers, filename=self.filename)

StandaloneDBMapping

Bases: DBMapping

This model extends a DBMapping with a VirtualDBCreation to create a standalone DBMapping. The concept_mappings can refer to tables in the original DB or the virtual DB defined by the VirtualDBCreation. A StandaloneDBMapping is serializable and intended to be used as an exchangeable ETL pipeline representation.

By providing an SQLAlchemy Engine, it can be turned into an ExecutableDBMapping or directly into an Exportable.

Source code in mitm_tooling/extraction/relational/mapping/db_mapping.py
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
class StandaloneDBMapping(DBMapping):
    """
    This model extends a `DBMapping` with a `VirtualDBCreation` to create a standalone DBMapping.
    The `concept_mappings` can refer to tables in the original DB or the virtual DB defined by the `VirtualDBCreation`.
    A `StandaloneDBMapping` is serializable and intended to be used as an exchangeable ETL pipeline representation.

    By providing an SQLAlchemy `Engine`, it can be turned into an `ExecutableDBMapping` or directly into an `Exportable`.
    """

    model_config = ConfigDict(arbitrary_types_allowed=True)

    virtual_db_creation: VirtualDBCreation

    def recreate_virtual_db(
        self, remote_engine: sa.Engine, queryable_verifier: Callable[[Queryable], bool] | None = None
    ) -> tuple[VirtualDB, dict[SourceDBType, DBMetaInfo]]:
        from mitm_tooling.transformation.sql import db_engine_into_db_meta

        original_db_meta = db_engine_into_db_meta(remote_engine)
        vdb = self.virtual_db_creation.apply(original_db_meta, queryable_verifier=queryable_verifier)
        return vdb, {SourceDBType.OriginalDB: original_db_meta, SourceDBType.VirtualDB: vdb.to_db_meta_info()}

    def to_executable(self, remote_engine: sa.Engine) -> ExecutableDBMapping:
        _, db_metas = self.recreate_virtual_db(remote_engine)
        return ExecutableDBMapping(mitm=self.mitm, concept_mappings=self.concept_mappings, db_metas=db_metas)

    def to_exportable(self, remote_engine: sa.Engine, filename: str | None = None) -> Exportable:
        return self.to_executable(remote_engine).to_exportable(filename)

MITM Data Extraction from Files/DataFrames

ConvContext

A basic conversion context. It supports conversion operations on the bound Engine.

Source code in mitm_tooling/extraction/anything/conversion_context.py
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
class ConvContext:
    """
    A basic conversion context.
    It supports conversion operations on the bound `Engine`.
    """

    def __init__(self, engine: sa.Engine):
        self.engine = engine

    def apply_db_mapping(self, db_mapping: StandaloneDBMapping) -> BoundExportable:
        """
        Apply a `StandaloneDBMapping` to the bound `Engine`, returning a `BoundExportable`.
        It can be used to initiate a (streamed) export.

        See `Exportable`.

        :param db_mapping: the `StandaloneDBMapping` to apply
        :return:
        """
        exp = db_mapping.to_exportable(self.engine)
        return exp.bind(self.engine)

    def probe(self) -> DBProbe:
        """
        Probe the database using the bound `Engine`.
        It contains information about the database schema and some sample-based inference of column types and value summaries.

        See `DBProbe`.

        :return: the probe result
        """
        return probe_engine(self.engine)

    def gen_llm_context(self, mitm: MITM, folder_path: FilePath | None = None) -> dict[str, str]:
        """
        Generate, and optionally save, some textual context for an LLM to use for suggesting a `StandaloneDBMapping`.

        :param mitm: the target MITM
        :param folder_path: optionally, a folder to save the context files to
        :return: a dictionary mapping file names to file contents
        """
        db_probe = self.probe()
        bundle = mk_llm_context_bundle(mitm, db_probe)
        if folder_path is not None:
            save_text_files(folder_path, bundle)
        return bundle

apply_db_mapping(db_mapping: StandaloneDBMapping) -> BoundExportable

Apply a StandaloneDBMapping to the bound Engine, returning a BoundExportable. It can be used to initiate a (streamed) export.

See Exportable.

Parameters:

Name Type Description Default
db_mapping StandaloneDBMapping

the StandaloneDBMapping to apply

required

Returns:

Type Description
BoundExportable
Source code in mitm_tooling/extraction/anything/conversion_context.py
200
201
202
203
204
205
206
207
208
209
210
211
def apply_db_mapping(self, db_mapping: StandaloneDBMapping) -> BoundExportable:
    """
    Apply a `StandaloneDBMapping` to the bound `Engine`, returning a `BoundExportable`.
    It can be used to initiate a (streamed) export.

    See `Exportable`.

    :param db_mapping: the `StandaloneDBMapping` to apply
    :return:
    """
    exp = db_mapping.to_exportable(self.engine)
    return exp.bind(self.engine)

gen_llm_context(mitm: MITM, folder_path: FilePath | None = None) -> dict[str, str]

Generate, and optionally save, some textual context for an LLM to use for suggesting a StandaloneDBMapping.

Parameters:

Name Type Description Default
mitm MITM

the target MITM

required
folder_path FilePath | None

optionally, a folder to save the context files to

None

Returns:

Type Description
dict[str, str]

a dictionary mapping file names to file contents

Source code in mitm_tooling/extraction/anything/conversion_context.py
224
225
226
227
228
229
230
231
232
233
234
235
236
def gen_llm_context(self, mitm: MITM, folder_path: FilePath | None = None) -> dict[str, str]:
    """
    Generate, and optionally save, some textual context for an LLM to use for suggesting a `StandaloneDBMapping`.

    :param mitm: the target MITM
    :param folder_path: optionally, a folder to save the context files to
    :return: a dictionary mapping file names to file contents
    """
    db_probe = self.probe()
    bundle = mk_llm_context_bundle(mitm, db_probe)
    if folder_path is not None:
        save_text_files(folder_path, bundle)
    return bundle

probe() -> DBProbe

Probe the database using the bound Engine. It contains information about the database schema and some sample-based inference of column types and value summaries.

See DBProbe.

Returns:

Type Description
DBProbe

the probe result

Source code in mitm_tooling/extraction/anything/conversion_context.py
213
214
215
216
217
218
219
220
221
222
def probe(self) -> DBProbe:
    """
    Probe the database using the bound `Engine`.
    It contains information about the database schema and some sample-based inference of column types and value summaries.

    See `DBProbe`.

    :return: the probe result
    """
    return probe_engine(self.engine)

MutatingConvContext

Bases: ConvContext

A conversion context that allows mutating operations, e.g., importing DataFrames into the database. See ConvContext.

Source code in mitm_tooling/extraction/anything/conversion_context.py
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
class MutatingConvContext(ConvContext):
    """
    A conversion context that allows mutating operations, e.g., importing `DataFrames` into the database.
    See `ConvContext`.
    """

    def __init__(self, engine: sa.Engine):
        super().__init__(engine)

    def add_dfs(self, df_loaders: dict[str, Callable[[], pd.DataFrame]]) -> list[tuple[TableName, int]]:
        """
        Insert the `DataFrames` provided by the `df_loaders` into the database using the bound `Engine`.

        :param df_loaders: a mapping of table names to callables that return `DataFrames`
        :return: some information about the inserted tables
        """

        _, insertions = dump_dfs_into_sqlite(df_loaders, engine=self.engine)
        return insertions

add_dfs(df_loaders: dict[str, Callable[[], pd.DataFrame]]) -> list[tuple[TableName, int]]

Insert the DataFrames provided by the df_loaders into the database using the bound Engine.

Parameters:

Name Type Description Default
df_loaders dict[str, Callable[[], DataFrame]]

a mapping of table names to callables that return DataFrames

required

Returns:

Type Description
list[tuple[TableName, int]]

some information about the inserted tables

Source code in mitm_tooling/extraction/anything/conversion_context.py
248
249
250
251
252
253
254
255
256
257
def add_dfs(self, df_loaders: dict[str, Callable[[], pd.DataFrame]]) -> list[tuple[TableName, int]]:
    """
    Insert the `DataFrames` provided by the `df_loaders` into the database using the bound `Engine`.

    :param df_loaders: a mapping of table names to callables that return `DataFrames`
    :return: some information about the inserted tables
    """

    _, insertions = dump_dfs_into_sqlite(df_loaders, engine=self.engine)
    return insertions

external_conv_ctxt(sql_alchemy_url: str | AnyUrl) -> Generator[ConvContext, None, None]

A conversion context that uses an external SQLAlchemy engine. It does not support mutating operations (e.g., adding DataFrames).

Parameters:

Name Type Description Default
sql_alchemy_url str | AnyUrl

the SQLAlchemy connection URL of the external database

required

Returns:

Type Description
Generator[ConvContext, None, None]
Source code in mitm_tooling/extraction/anything/conversion_context.py
280
281
282
283
284
285
286
287
288
289
290
@contextlib.contextmanager
def external_conv_ctxt(sql_alchemy_url: str | AnyUrl) -> Generator[ConvContext, None, None]:
    """
    A conversion context that uses an external SQLAlchemy engine.
    It does not support mutating operations (e.g., adding `DataFrames`).

    :param sql_alchemy_url: the SQLAlchemy connection URL of the external database
    :return:
    """
    with external_sql_ctxt(sql_alchemy_url) as engine:
        yield ConvContext(engine)

local_conv_ctxt(variant: Literal['memory', 'tempdir'] = 'memory') -> Generator[MutatingConvContext, None, None]

A conversion context that uses a transient local SQLAlchemy engine. Can be either in-memory or on disk (in a temporary directory).

Parameters:

Name Type Description Default
variant Literal['memory', 'tempdir']

the variant of the local SQLite DB to use, either 'memory' or 'tempdir'

'memory'

Returns:

Type Description
Generator[MutatingConvContext, None, None]
Source code in mitm_tooling/extraction/anything/conversion_context.py
267
268
269
270
271
272
273
274
275
276
277
@contextlib.contextmanager
def local_conv_ctxt(variant: Literal['memory', 'tempdir'] = 'memory') -> Generator[MutatingConvContext, None, None]:
    """
    A conversion context that uses a transient local SQLAlchemy engine.
    Can be either in-memory or on disk (in a temporary directory).

    :param variant: the variant of the local SQLite DB to use, either 'memory' or 'tempdir'
    :return:
    """
    with sqlite_ctxt(variant) as engine:
        yield MutatingConvContext(engine)