diff --git a/.github/workflows/data-update.yml b/.github/workflows/data-update.yml index 8a61b9a..e37a126 100644 --- a/.github/workflows/data-update.yml +++ b/.github/workflows/data-update.yml @@ -34,6 +34,9 @@ jobs: - name: Update from various data sources... run: ./update.sh + - name: Generate derivatives... + run: ./derive.sh + - name: Deploy updated site... run: ./deploy.sh env: diff --git a/.gitignore b/.gitignore index edaacdd..23d8fde 100644 --- a/.gitignore +++ b/.gitignore @@ -8,3 +8,8 @@ /bin /pywikibot.lwp /passwordfile +*.pyc +/registries.db +/.venv +/build +/data diff --git a/Makefile b/Makefile new file mode 100644 index 0000000..8d53fe8 --- /dev/null +++ b/Makefile @@ -0,0 +1,9 @@ + +all: registries.db + +registries.db: foreging/*.py digipres.github.io/_sources/registries/* + rm -fr data + mkdir -p data + python -m foreging.populate --json data + cp data/registries.db digipres.github.io/_data/formats/registries.db + cp data/*.parquet digipres.github.io/_data/formats/index/ diff --git a/README.md b/README.md index dce7756..1e07311 100644 --- a/README.md +++ b/README.md @@ -35,6 +35,8 @@ To Do * http://en.wikipedia.org/wiki/Alphabetical_list_of_filename_extensions_%28M%E2%80%93R%29 * http://www.webarchive.org.uk/aadda-discovery/formats?f[0]=content_type_ext:%22.bmp%22 * https://twitter.com/benfinoradin/status/532212803630039041 +* Talk about how to use `git submodule update --recursive --remote` to make sure `pywikibot` and `digipres.github.io` are up to date. +* Using `uvx datasette serve data/registries.db` to quickly poke around in the database. COPTR Bot --------- diff --git a/aggregates.py b/aggregates.py index b7ffd1f..6c4adef 100644 --- a/aggregates.py +++ b/aggregates.py @@ -86,7 +86,6 @@ def addFormat(rid,fid,finfo): # And add: fmts[rid]['formats'][fid] = finfo - def aggregateFDD(): rid = "fdd" print("Parsing %s..." % rid) @@ -132,6 +131,7 @@ def aggregateFDD(): if rid in fmts: # FIXME this needs to be more robust, rather than relying on happening after 'addFormat' is called for the first time. fmts[rid]['warnings'].append(f"Error when parsing XML from '{filename}': {e}") + def aggregateTRiD(): rid = "trid" print("Parsing %s..." % rid) @@ -394,9 +394,9 @@ def aggregateWikiData(): with open("%s/extensions.yml" % data_dir, 'w') as outfile: outfile.write( yaml.safe_dump(extensions, default_flow_style=False) ) -# Write out Venn data +# Write out Venn data, starting from a list like [extension] -> Registry_ID: print("Outputting Venn data based on extensions...") -# Key all the RID-to-integer mappings: +# Key all the Registry_ID-to-integer mappings: vennls = {} i = 0 for fmt in fmts: @@ -407,15 +407,19 @@ def aggregateWikiData(): venndsl = defaultdict(list) vennlt = defaultdict(int) vennids = {} +# Loop over all extensions: for extension in exts: regs = set() regIds = set() + # Loop over each registry the extension appears in: for ridder in exts[extension]['identifiers']: regs.add(vennls[ridder['regId']]) regIds.add(ridder['regId']) for rid in regs: vennlt[rid] += 1 + # Build a unique key for each registry combination: key = ','.join(sorted(regs)) + # Use the key to build up each overlap set: vennids[key] = sorted(regIds) venndsl[key].append(extension) vennds[key] += 1 diff --git a/derive.sh b/derive.sh new file mode 100755 index 0000000..5f9ce49 --- /dev/null +++ b/derive.sh @@ -0,0 +1,10 @@ +#!/bin/bash +set -e + +source venv/bin/activate + +make + +cp data/registries.db digipres.github.io/_data/formats/ +cp data/*.parquet digipres.github.io/_data/formats/index + diff --git a/digipres.github.io b/digipres.github.io index 7d0a158..35f8c1a 160000 --- a/digipres.github.io +++ b/digipres.github.io @@ -1 +1 @@ -Subproject commit 7d0a158a697769afa5482e3c929fc968e070ec3d +Subproject commit 35f8c1ae870ac2aafab8f921753a2c682c4ace24 diff --git a/foreging/__init__.py b/foreging/__init__.py new file mode 100644 index 0000000..4c04cbc --- /dev/null +++ b/foreging/__init__.py @@ -0,0 +1 @@ +# FOrmat REGistry INdexinG \ No newline at end of file diff --git a/foreging/coptr.py b/foreging/coptr.py new file mode 100644 index 0000000..bb9b8f1 --- /dev/null +++ b/foreging/coptr.py @@ -0,0 +1,72 @@ +import mwclient as mw +from mwclient.listing import Category, PageList +import mwparserfromhell + +from .models import Software + +import logging +logging.basicConfig(level=logging.WARNING) + + +coptr_host = 'coptr.digipres.org' +user_agent = 'DigiPresFormatIndexClient/0.1 (andrew.jackson@dpconline.org)' +site = mw.Site(coptr_host, path='/', clients_useragent=user_agent) + +#for tool_page in site.allpages(): +# pass + +#category = site.categories[u"Tool Grid"] +#for page in category: +# print(page.name) + + +# {{Infobox tool +# |image=JHOVE.gif +# |purpose=JHOVE provides functions to perform format-specific identification, validation, and characterization of digital objects. +# |homepage=http://jhove.openpreservation.org/ +# |license=GNU Lesser General Public License (LGPL) +# |platforms=JHOVE should be usable on any UNIX, Windows, or OS X platform with an appropriate J2SE installation. It should run on any operating system that supports Java 1.5 and has a directory-based file system. +# |formats_in=EPUB, GIF, JP2, JPEG, PDF, PNG, PREMIS (Preservation Metadata Implementation Strategies), TIFF, WARC, XML, AIFF, WAVE, GZIP, ASCII, UTF-8, HTML, MP3 +# |function=Encryption Detection, File Format Identification, Metadata Extraction, Validation +# }} + +# FIXME this does both at once! One should write the page info needed to JSON. The other should use it. +# But, we don't know everything we need yet, I guess? + +category: PageList = site.categories[u"Tools"] +for page in category: + print(page.name) + text = page.text() + wikicode = mwparserfromhell.parse(text) + templates = wikicode.filter_templates(matches='infobox tool') + template = templates[0] + formats = template.get("formats_in", None) + if formats: + formats = [f.strip() for f in formats.value.split(",")] + print(f" < {formats}") + formats = template.get("formats_out", None) + if formats: + formats = [f.strip() for f in formats.value.split(",")] + print(f" > {formats}") + print(page.pageid) + if isinstance(page, Category): + for member in page.members(): + print(f"{page.name} > {member.name}") + else: + pass + s = Software( + id=f"coptr:pageid:{page.pageid}", + name=page.name, + version=None, + license=None, + registry_url=f"https://{coptr_host}/Special:Redirect/page/{page.pageid}" + ) + license = template.get('license', None) + if license: + s.license = license.value.strip() + print(s) + + + +# Workflows in Workflow namespace +# Formats is another potential category, but needs patching in via external IDs. \ No newline at end of file diff --git a/foreging/db/__init__.py b/foreging/db/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/foreging/db/extension_sets.py b/foreging/db/extension_sets.py new file mode 100644 index 0000000..033e236 --- /dev/null +++ b/foreging/db/extension_sets.py @@ -0,0 +1,43 @@ +import json +import sqlite3 +import logging +import argparse +from collections import defaultdict + +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger(__name__) + + +def generate_ext_sets(db): + con = sqlite3.connect(db) + + cur = con.cursor() + + ext_sets = defaultdict(set) + ext_counts = defaultdict(int) + for row in cur.execute("SELECT registry_id, format.id, e.value FROM format, json_each(extensions) AS e ORDER BY e.value ASC"): + ext_sets[row[0]].add(row[2].lower().strip()) + ext_counts[row[0]] += 1 + + for source, ext_set in ext_sets.items(): + ext_sets[source] = list(ext_set) + logger.info(f"Registry {source} has {ext_counts[source]} extensions, of which {len(ext_set)} are unique. Ratio: {ext_counts[source]/len(ext_set)}") + return ext_sets, ext_counts + + +if __name__ == "__main__": + # Args setup: + parser = argparse.ArgumentParser() + parser.add_argument('input_db') + parser.add_argument('output_json') + args = parser.parse_args() + + # Query and return the sets of extensions: + ext_sets, ext_counts = generate_ext_sets(args.input_db) + + # Output the sets of extensions: + with open(args.output_json, 'w') as f: + json.dump(ext_sets, f) + + + diff --git a/foreging/db/models.py b/foreging/db/models.py new file mode 100644 index 0000000..0aa8a09 --- /dev/null +++ b/foreging/db/models.py @@ -0,0 +1,117 @@ +from datetime import date +from sqlmodel import Field, Relationship, Session, SQLModel, create_engine, JSON, Column + + +class Registry(SQLModel, table=True): + id: str | None = Field(default=None, primary_key=True) + name: str = Field(index=True) + url: str | None = Field() + id_prefix: str | None = Field() + index_data_url: str | None = Field() + + data_log: list["RegistryDataLogEntry"] = Relationship() + + +class RegistryDataLogEntry(SQLModel, table=True): + __tablename__ = 'registry_data_log' + id: int | None = Field(default=None, primary_key=True) + level: str = Field(index=True) + message: str = Field() + url: str | None = Field() + + registry_id: str | None = Field(default=None, foreign_key="registry.id") + registry: Registry | None = Relationship(back_populates="data_log") + + # Define how to spot unique entries in a set + def __hash__(self): + return hash(self.message) + def __eq__(self,other): + return self.message == other.message + +class SoftwareReadsFormatLink(SQLModel, table=True): + __tablename__ = "formats_read_by_software" + format_id: str | None = Field(default=None, foreign_key="format.id", primary_key=True) + software_id: str | None = Field(default=None, foreign_key="software.id", primary_key=True) + +class SoftwareWritesFormatLink(SQLModel, table=True): + __tablename__ = "formats_written_by_software" + format_id: str | None = Field(default=None, foreign_key="format.id", primary_key=True) + software_id: str | None = Field(default=None, foreign_key="software.id", primary_key=True) + +class Software(SQLModel, table=True): + id: str | None = Field(default=None, primary_key=True) + name: str = Field(index=True) + version: str | None = Field(index=True) + summary: str | None = Field(index=True) + license: str | None = Field(index=True) + registry_url: str | None = Field(index=True) + + reads: list["Format"] = Relationship(back_populates="readers", link_model=SoftwareReadsFormatLink) + writes: list["Format"] = Relationship(back_populates="writers", link_model=SoftwareWritesFormatLink) + + registry_id: str | None = Field(default=None, foreign_key="registry.id") + registry: Registry | None = Relationship() + + # Define how to spot unique entries in a set + def __hash__(self): + return hash(self.id) + def __eq__(self,other): + return self.id == other.id + +class FormatGenresLink(SQLModel, table=True): + __tablename__ = "format_genres" + format_id: str | None = Field(default=None, foreign_key="format.id", primary_key=True) + genre_id: str | None = Field(default=None, foreign_key="genre.id", primary_key=True) + +class Genre(SQLModel, table=True): + id: int | None = Field(default=None, primary_key=True) + name: str = Field(index=True) + # + formats: list["Format"] = Relationship(back_populates="genres", link_model=FormatGenresLink) + + # Define how to spot unique entries in a set + def __hash__(self): + return hash(self.name) + def __eq__(self,other): + return self.name == other.name + +class MediaTypesFormatsLink(SQLModel, table=True): + __tablename__ = "format_media_types" + format_id: str | None = Field(default=None, foreign_key="format.id", primary_key=True) + media_type_id: str | None = Field(default=None, foreign_key="media_type.id", primary_key=True) + +class MediaType(SQLModel, table=True): + __tablename__ = "media_type" + id: str | None = Field(default=None, primary_key=True) + # + formats: list["Format"] = Relationship(back_populates="media_types", link_model=MediaTypesFormatsLink) + + # Define how to spot unique entries in a set + def __hash__(self): + return hash(self.id) + def __eq__(self,other): + return self.id == other.id + +class Format(SQLModel, table=True): + id: str | None = Field(default=None, primary_key=True) + name: str | None = Field(index=True) + version: str | None = Field(index=True) + summary: str | None = Field(index=True) + genres: list["Genre"] = Relationship(back_populates="formats", link_model=FormatGenresLink) + extensions: list[str] | None = Field(default=None, sa_column=Column(JSON)) + media_types: list["MediaType"] = Relationship(back_populates="formats", link_model=MediaTypesFormatsLink) + has_magic: bool = Field(default=False) + primary_media_type: str | None = Field(index=True) + parent_media_type: str | None = Field(index=True) + registry_url: str | None = Field(index=True) + registry_source_data_url: str | None = Field(index=True) + registry_index_data_url: str | None = Field(index=True) + created: date | None = Field(index=True) + last_modified: date | None = Field(index=True) + + readers: list["Software"] = Relationship(back_populates="reads", link_model=SoftwareReadsFormatLink) + writers: list["Software"] = Relationship(back_populates="writes", link_model=SoftwareWritesFormatLink) + + registry_id: str | None = Field(default=None, foreign_key="registry.id") + registry: Registry | None = Relationship() + diff --git a/foreging/ffw.py b/foreging/ffw.py new file mode 100644 index 0000000..d2f01ea --- /dev/null +++ b/foreging/ffw.py @@ -0,0 +1,78 @@ +import json +import yaml +import logging +from .models import Format, Registry, RegistryClient + +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger(__name__) + + +# +class FFW(RegistryClient): + source_file = "digipres.github.io/_sources/registries/mediawikis/ffw.yml" + # Set up the Registry object for this class: + registry_id = "ffw" + registry = Registry( + id=registry_id, + name="Just Solve The Problem File Formats Wiki", + url="http://fileformats.archiveteam.org/", + id_prefix='http://fileformats.archiveteam.org/wiki/', + index_data_url=f"https://github.com/digipres/digipres.github.io/blob/master/{source_file}" + ) + + def get_formats(self): + stream = open(self.source_file, 'r') + ffw = yaml.safe_load(stream) + stream.close() + + for fmt in ffw['formats']: + f_info = {} + f_info['extensions'] = set() + f_info['mimetypes'] = set() + f_info['categories'] = set() + f_info['hasMagic'] = False + ff_id = self.registry_id + ':' + fmt['name'] + for key in fmt: + if key == 'extensions': + for ext in fmt[key]: + if ext: + ext=ext.lower().strip() + f_info['extensions'].add(ext) + elif key == 'mimetypes': + f_info['mimetypes'] = fmt[key] + elif key == 'categories': + f_info['categories'] = fmt[key] + else: + f_info[key] = fmt[key] + # Released... + released_in: str = fmt.get('released', None) + # Drop empty entries: + if released_in == '': + released_in = None + # Drop any .... content: + if released_in: + ref_index = released_in.find(" -1: + released_in = released_in[0:ref_index] + + # Set up as a format entity: + f = Format( + registry_id=self.registry_id, + id=ff_id, + name=f_info['name'], + version=None, + summary=f_info.get('pageStartText', None), + genres=list(f_info['categories']), + extensions=list(f_info['extensions']), + media_types=list(f_info['mimetypes']), + has_magic=f_info['hasMagic'], + primary_media_type=None, + parent_media_type=None, + released_in=released_in, + registry_url=fmt['source'], + registry_source_data_url=fmt['source'], + registry_index_data_url=None, + created=None, + last_modified=None + ) + yield f \ No newline at end of file diff --git a/foreging/file.py b/foreging/file.py new file mode 100644 index 0000000..53bdb65 --- /dev/null +++ b/foreging/file.py @@ -0,0 +1,62 @@ +import json +import yaml +from lxml import etree +from io import BytesIO +import logging +from .models import Format, Registry, RegistryClient + +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger(__name__) + + +# +class File(RegistryClient): + source_file = "digipres.github.io/_sources/registries/file/polyfile-magic.jsonl" + source_url = "https://github.com/trailofbits/polyfile/tree/master/polyfile/magic_defs" + index_url = "https://github.com/digipres/digipres.github.io/blob/master/_sources/registries/file/polyfile-magic.jsonl" + # Set up the Registry object for this class: + registry_id = "file" + registry = Registry( + id=registry_id, + name="file libmagic (via polyfile 0.55)", + url="https://www.darwinsys.com/file/", + id_prefix=None, + index_data_url=index_url + ) + + def get_formats(self): + fmts = [] + idx = 0 + with open(self.source_file, "r") as f: + for line in f.readlines(): + idx += 1 + entry = json.loads(line) + if 'extensions' in entry or 'types' in entry: + # Remove duplicate entries: + extensions = list(set(entry.get('extensions', []))) + media_types = list(set(entry.get('types', []))) + # Set up as a format entity: + f = Format( + registry_id=self.registry_id, + id=f"{self.registry_id}:{idx}", + name=entry["name"], + version=None, + summary=None, + genres=[], + extensions=extensions, + media_types=media_types, + has_magic=True, + primary_media_type=None, + parent_media_type=None, + registry_url=None, + registry_source_data_url=self.source_url, + registry_index_data_url=f"{self.index_url}#L{idx}", + created=None, + last_modified=None + ) + # And record the entry: + fmts.append(f) + + # Now yield them, so all the log entries get stored too: + for f in fmts: + yield f \ No newline at end of file diff --git a/foreging/linguist.py b/foreging/linguist.py new file mode 100644 index 0000000..b945f34 --- /dev/null +++ b/foreging/linguist.py @@ -0,0 +1,66 @@ +import json +import yaml +import logging +from .models import Format, Registry, RegistryClient + +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger(__name__) + + +# +class Linguist(RegistryClient): + source_file = "digipres.github.io/_sources/registries/githublinguist/languages.yml" + # Set up the Registry object for this class: + registry_id = "linguist" + registry = Registry( + id=registry_id, + name="GitHub Linguist", + url="https://github.com/github-linguist/linguist", + id_prefix=None, + index_data_url=None + ) + + def get_formats(self): + stream = open(self.source_file, 'r') + ghl = yaml.safe_load(stream) + stream.close() + + for fmt_name in ghl: + fmt = ghl[fmt_name] + f_info = {} + f_info['name'] = fmt_name + f_info['extensions'] = set() + f_info['mimetypes'] = set() + f_info['hasMagic'] = False + ff_id = f"{self.registry_id}:{fmt['language_id']}" + for key in fmt: + if key == 'extensions': + for ext in fmt[key]: + if ext: + ext=ext.strip('.') # Drop the prefix dot + f_info['extensions'].add(ext) + elif key == 'codemirror_mime_type': + f_info['mimetypes'].add(fmt[key]) + else: + f_info[key] = fmt[key] + + # Set up as a format entity: + f = Format( + registry_id=self.registry_id, + id=ff_id, + name=f_info['name'], + version=None, + summary=None, + genres=[], + extensions=list(f_info['extensions']), + media_types=list(f_info['mimetypes']), + has_magic=f_info['hasMagic'], + primary_media_type=None, + parent_media_type=None, + registry_url=None, + registry_source_data_url=None, + registry_index_data_url=None, + created=None, + last_modified=None + ) + yield f \ No newline at end of file diff --git a/foreging/loc_fdd.py b/foreging/loc_fdd.py new file mode 100644 index 0000000..b5b589b --- /dev/null +++ b/foreging/loc_fdd.py @@ -0,0 +1,126 @@ +import os +import logging +import datetime +from bs4 import BeautifulSoup +from .models import Format, Registry, RegistryDataLogEntry, RegistryClient + +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger(__name__) + +class LocFDD(RegistryClient): + registry_id = "lcfdd" + source_folder = 'digipres.github.io/_sources/registries/fdd/fddXML' + show_parsed_xml_on_errors = False + registry = Registry( + id=registry_id, + name="Library of Congress Format Description Documents", + url="https://www.loc.gov/preservation/digital/formats/", + id_prefix='https://www.loc.gov/preservation/digital/formats/fdd/' + ) + + def get_formats(self): + + fmts = {} + + for filename in os.listdir(self.source_folder): + if filename.endswith(".xml"): + logger.debug(f"Parsing {filename}...") + with open(f"{self.source_folder}/{filename}", "rb") as f: + xml = f.read() + root = None + try: + # Alternative code that was more difficult to work with: + #parser = etree.XMLParser() + #root = etree.parse(BytesIO(xml), parser) + root = BeautifulSoup(xml, "xml") + ffd_id = root.find('FDD').get('id') + f_name = root.find('FDD').get('titleName') + + # Check if we should keep this one, or if something seems to have gone wrong: + if filename != f"{ffd_id}.xml": + self.registry.data_log.append( + RegistryDataLogEntry( + level="error", + message=f"File name of {filename} does not match embedded FDD ID of {ffd_id}", + url=f"https://www.loc.gov/preservation/digital/formats/fddXML/{filename}" + ) + ) + continue + + # If there's a version string, grab it: + f_version = None + if ", Version " in f_name: + f_version = f_name.split(", Version ", 1)[1] + # Genre: + f_genres = list() + for gns in root.findAll('gdfrGenreSelection'): + for gn in gns.findAll('gdfrGenre'): + f_genres.append(f"gdfr:{gn.text}") + # Haz Magic? + if root.find('magicNumbers'): + f_magic = True + else: + f_magic = False + # Get extensions: + f_extensions = set() + for fe in root.findAll('filenameExtension'): + for fev in fe.findAll('sigValue'): + ext = f"{fev.text}" + f_extensions.add(ext) + # Get MIME types: + f_mimetypes = set() + for imts in root.findAll('internetMediaType'): + for mt in imts.findAll('sigValue'): + f_mimetypes.add(mt.text) + # Find the date: + edit_date = root.findAll('date')[-1].text + try: + edit_date = datetime.date.fromisoformat(edit_date) + except ValueError: + self.registry.data_log.append(RegistryDataLogEntry( + level='warning', + message=f"Unexpected data format '{edit_date}' for record {ffd_id}, expected 'YYYY-MM-DD'.", + )) + edit_date = None + + # Create record: + f = Format( + registry_id=self.registry_id, + id=f"{self.registry_id}:{ffd_id}", + name=f_name, + version=f_version, + summary=root.find("shortDescription").text, + genres=f_genres, + extensions=list(f_extensions), + media_types=list(f_mimetypes), + has_magic=f_magic, + primary_media_type=None, + parent_media_type=None, + registry_url=f"https://www.loc.gov/preservation/digital/formats/fdd/{ffd_id}/", + registry_source_data_url=f"https://www.loc.gov/preservation/digital/formats/fddXML/{filename}", + registry_index_data_url=f"https://github.com/digipres/digipres.github.io/blob/master/_sources/registries/fdd/fddXML/{ffd_id}.xml", + additional_fields= None, + created=edit_date, + last_modified=edit_date, + ) + fmts[ffd_id] = f + + except Exception as e: + logger.error(f"Parsing {filename} {ffd_id} failed: {e}") + self.registry.data_log.append( + RegistryDataLogEntry( + level='error', + message=f"Error when parsing XML from '{filename}': {e}" + ) + ) + # Emit extra debug info if possible: + if root and self.show_parsed_xml_on_errors: + logger.error("XML parsed as:") + logger.error(root.prettify()) + #print(etree.tostring(root, pretty_print=True).decode('utf-8')) + + # Return the values: + for id in fmts: + f = fmts[id] + yield f + diff --git a/foreging/mediainfo.py b/foreging/mediainfo.py new file mode 100644 index 0000000..4b0a9d4 --- /dev/null +++ b/foreging/mediainfo.py @@ -0,0 +1,63 @@ +import json +import yaml +from lxml import etree +from io import BytesIO +import logging +from .models import Format, Registry, RegistryClient + +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger(__name__) + + +# +class MediaInfo(RegistryClient): + source_file = "digipres.github.io/_sources/registries/mediainfo/mediainfo.jsonl" + source_url = "https://mediaarea.net/en/MediaInfo/Support/Formats" + index_url = "https://github.com/digipres/digipres.github.io/blob/master/_sources/registries/mediainfo/mediainfo.jsonl" + # Set up the Registry object for this class: + registry_id = "mediainfo" + registry = Registry( + id=registry_id, + name="MediaInfo (WIP)", + url="https://mediaarea.net/en/MediaInfo/Support/Formats", + id_prefix=None, + index_data_url=index_url + ) + + def get_formats(self): + fmts = [] + idx = 0 + with open(self.source_file, "r") as f: + for line in f.readlines(): + idx += 1 + entry = json.loads(line) + if 'extensions' in entry or 'types' in entry: + # Do the ugly book-keeping to make the SQL work: + extensions = list() + for extension in set(entry.get('extensions', [])): + extensions.append(extension) + # Set up as a format entity: + f = Format( + registry_id=self.registry_id, + id=f"{self.registry_id}:{idx}", + name=entry["name"], + version=None, + summary=None, + genres=[], + extensions=extensions, + media_types=[], + has_magic=True, + primary_media_type=None, + parent_media_type=None, + registry_url=None, + registry_source_data_url=self.source_url, + registry_index_data_url=f"{self.index_url}#L{idx}", + created=None, + last_modified=None + ) + # And record the entry: + fmts.append(f) + + # Now yield them, so all the log entries get stored too: + for f in fmts: + yield f \ No newline at end of file diff --git a/foreging/models.py b/foreging/models.py new file mode 100644 index 0000000..a412afc --- /dev/null +++ b/foreging/models.py @@ -0,0 +1,93 @@ +import json +from typing import List, Optional, Set, Dict, Tuple, Type, Union, Literal, Annotated, Iterator +from pydantic import BaseModel +from datetime import datetime, date +from abc import ABC, abstractmethod + +# +# A Software record +# +class Software(BaseModel): + id: str + name: str + version: Optional[str] = None + summary: Optional[str] = None + license: Optional[str] = None + registry_url: Optional[str] = None + + # Define how to spot unique entries in a set + def __hash__(self): + return hash(self.id) + def __eq__(self,other): + return self.id == other.id + +# +# Data model of normalised form of a format record: +# +class Format(BaseModel): + id: str | None + name: str | None + version: str | None + summary: str | None + genres: list[str] | None + extensions: list[str] | None + media_types: list[str] + has_magic: bool + primary_media_type: str | None = None + parent_media_type: str | None = None + # When this format was published or became available for use (ideally the release year): + released_in: str | None = None + # Tracing back to the source registry: + registry_url: str | None = None + registry_source_data_url: str | None = None + registry_index_data_url: str | None = None + # The created and modified dates for this record about a format + created: date | None = None + last_modified: date | None = None + + readers: Optional[list[Software]] = [] + writers: Optional[list[Software]] = [] + + registry_id: str | None + + +# +# And for a Registry: +# +class RegistryDataLogEntry(BaseModel): + level: str + message: str + url: Optional[str] = None + + # Define how to spot unique entries in a set + def __hash__(self): + return hash(self.message) + def __eq__(self,other): + return self.message == other.message + +class Registry(BaseModel): + id: str + name: str + url: str + id_prefix: Optional[str] = None + index_data_url: Optional[str] = None + # Log for any issues + data_log: list[RegistryDataLogEntry] = [] + # The set of extensions known to this registry: + extensions: list[str] = None + + +# +# An Abstract Base Class for the client code: +# +class RegistryClient(ABC): + + @property + @abstractmethod + def registry(self) -> Registry: + pass + + @abstractmethod + def get_formats(self) -> Iterator[Format]: + ... + diff --git a/foreging/nara.py b/foreging/nara.py new file mode 100644 index 0000000..7c5876b --- /dev/null +++ b/foreging/nara.py @@ -0,0 +1,105 @@ +import json +import logging +from rdflib import Graph, RDF, DCTERMS +from rdflib.namespace import DefinedNamespace, Namespace +from rdflib.term import URIRef +from .models import Software, Format, Registry, RegistryClient + +logger = logging.getLogger(__name__) + +# +# Define RDF entities needed to work with this source: +# +class NARA(DefinedNamespace): + FileFormat: URIRef # File Format + category: URIRef + formatName: URIRef + preservationAction: URIRef + preservationPlan: URIRef + riskLevel: URIRef + tools: URIRef + + _NS = Namespace("https://www.archives.gov/data/lod/dpframework/def/") + +class WDT(DefinedNamespace): + p1163: URIRef # Media Type + p1195: URIRef # File Extension + p2748: URIRef # PRONOM link + p3381: URIRef # File Formats Wiki link + p973: URIRef # Wikipedia link + + _NS = Namespace("http://www.wikidata.org/entity/") + + + +# +# NARA File Format Preservation Plan parser +# +class NARA_FFPP(RegistryClient): + registry_id = "naradpf" + source_file = 'digipres.github.io/_sources/registries/nara/fileformats.ttl' + registry = Registry( + id=registry_id, + name="NARA Digital Preservation Framework", + url="https://www.archives.gov/preservation/digital-preservation/linked-data", + id_prefix='https://www.archives.gov/files/lod/dpframework/id/' + ) + + def get_formats(self): + + g = Graph() + g.parse(self.source_file) + + for s, p, o in g.triples((None, RDF.type, NARA.FileFormat)): + ff_id = f"{self.registry_id}:{g.value(s, DCTERMS.identifier)}.ttl" + # Grab: Action, Plan, Tools, PUID, FFW, Described-At + additional = {} + for p in [ NARA.preservationAction, NARA.preservationPlan, WDT.p2748, WDT.p3381, WDT.p973]: + value = g.value(s, p) + if value: + additional[p] = [o for s, p, o in g.triples((s, p, None))] + logger.debug("Additional fields: " + json.dumps(additional, indent=2)) + # Set up other fields: + extensions = set() + for ext in [o for s, p, o in g.triples((s, WDT.p1195, None))]: + ext = str(ext) + extensions.add(ext) + genres = [] + for genre in [o for s, p, o in g.triples((s, NARA.category, None))]: + genres.append(str(genre)) + media_types = [] + for mt in [o for s, p, o in g.triples((s, WDT.p1163, None))]: + media_types.append(str(mt)) + readers = [] + for tools in [o for s, p, o in g.triples((s, NARA.tools, None))]: + for tool in str(tools).split(';'): + sw = Software( + registry=self.registry, + id=f"{ff_id}#{len(readers)}", + name=tool.strip() + ) + readers.append(sw) + + # Set up as a format entity: + f = Format( + registry_id=self.registry_id, + id=ff_id, + name=g.value(s, NARA.formatName), + version=None, + summary=g.value(s, DCTERMS.description), + genres=genres, + extensions=list(extensions), + media_types=media_types, + has_magic=False, + primary_media_type=None, + parent_media_type=None, + registry_url=f"https://www.archives.gov/preservation/digital-preservation/linked-data#{ff_id}", + registry_source_data_url=f"https://www.archives.gov/files/lod/dpframework/id/{ff_id}.ttl", + registry_index_data_url=f"https://github.com/digipres/digipres.github.io/blob/master/_sources/registries/nara/fileformats.ttl#{ff_id}", + created=None, + last_modified=None, + readers=readers + ) + yield f + + diff --git a/foreging/populate.py b/foreging/populate.py new file mode 100644 index 0000000..3c41135 --- /dev/null +++ b/foreging/populate.py @@ -0,0 +1,131 @@ +from .file import File +from .ffw import FFW +from .linguist import Linguist +from .loc_fdd import LocFDD +from .mediainfo import MediaInfo +from .nara import NARA_FFPP +from .pronom import PRONOM +from .tcdb import TCDB +from .tffh import TFFH +from .tika import Tika +from .trid import TrID +from .wikidata import WikiData +from .models import Format, RegistryClient + +from pydantic import BaseModel +from sqlite_utils import Database +import pyarrow as pa +import pyarrow.parquet as pq +import argparse +import logging +from pathlib import Path +import json +from typing import Dict +from collections import defaultdict + +logging.basicConfig(level=logging.INFO) +log = logging.getLogger(__name__) + +# Push in the data: +def populate_database(session, gen, mts, genres): + log.info("Getting transformed format records for registry ID %s..." % gen.registry_id) + for f in gen.get_formats(mts, genres): + session.add(f) + +if __name__ == "__main__": + # Registries + registries: Dict[str,RegistryClient] = {} + for r in [File(), FFW(), Linguist(), LocFDD(), MediaInfo(), NARA_FFPP(), PRONOM(), TCDB(), TFFH(), Tika(), TrID(), WikiData()]: + registries[r.registry.id] = r + + # Args + parser = argparse.ArgumentParser() + parser.add_argument('--only', required=False, choices=registries.keys()) + parser.add_argument('--jsonl', action=argparse.BooleanOptionalAction) + parser.add_argument('output_path') + args = parser.parse_args() + + # Get the output file: + output_path= Path(args.output_path) + if not output_path.exists(): + output_path.mkdir() + elif output_path.is_file(): + raise Exception("Output path should not be a file!") + + # Gather the data: + formats = [] + for reg_id in registries: + reg = registries[reg_id] + reg.registry.extensions = set() + if args.only == None or args.only == reg_id: + log.info(f"Parsing data from Registry ID = {reg.registry_id}") + for f in reg.get_formats(): + formats.append(f) + for ext in f.extensions: + reg.registry.extensions.add(ext.lower()) + # Convert set to list: + reg.registry.extensions = list(reg.registry.extensions) + + # Generate extensions lookup dataset, sorted by extension to hopefully make it faster: + ext_to_fmt = {} + for f in formats: + f: Format + for ext in f.extensions: + entries: list = ext_to_fmt.get( ext, [] ) + entries.append(f) + ext_to_fmt[ext] = entries + extensions = [] + for ext,fmts in sorted(ext_to_fmt.items()): + extensions.append({ + 'id': ext, + 'format_ids': [f.id for f in fmts] + }) + + # Define outputs: + outputs = { + "registries": [registries[id].registry for id in registries], + "formats": formats, + "extensions": extensions + } + + # Generate raw JSONL output + if args.jsonl: + log.info("Generating JSONL export...") + for name, records in outputs.items(): + with open( output_path / f"{name}.jsonl", "w") as f: + for r in records: + if isinstance(r, BaseModel): + f.write(r.model_dump_json()) + else: + f.write(json.dumps(r)) + f.write("\n") + + # And generate Parquet version: + log.info("Generating Parquet export...") + for name, records in outputs.items(): + plain_records = [] + for item in records: + if isinstance(item, BaseModel): + plain_records.append(item.model_dump()) + else: + plain_records.append(item) + table = pa.Table.from_pylist(plain_records) + # Sort the records by the ID field and note this in the Parquet: + sort_order = [('id', 'ascending')] + table = table.sort_by(sort_order) + sorting_columns = pq.SortingColumn.from_ordering(table.schema, sort_order) + pq.write_table(table, output_path / f"{name}.parquet", write_page_index=True, sorting_columns=sorting_columns) + + # Generate SQLite DB + log.info("Generating SQLite export...") + sql_path = output_path / "registries.db" + db = Database(sql_path, recreate=True) + for r in formats: + db["formats"].insert(r.model_dump(), pk="id") + for r in outputs["registries"]: + db["registries"].insert(r.model_dump(), pk="id") + db["formats"].enable_fts(['name', 'version', 'summary', 'genres', 'extensions', 'media_types', 'writers', 'readers']) + # sqlite-utils add-foreign-key data/registries.db formats registry_id registries id + db["formats"].add_foreign_key('registry_id', 'registries', 'id') + + diff --git a/foreging/pronom.py b/foreging/pronom.py new file mode 100644 index 0000000..6d9875b --- /dev/null +++ b/foreging/pronom.py @@ -0,0 +1,114 @@ +import os +import logging +import datetime +from bs4 import BeautifulSoup +from .models import Format, Registry, RegistryClient + +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger(__name__) + +class PRONOM(RegistryClient): + registry_id = "pronom" + source_folder = 'digipres.github.io/_sources/registries/pronom/' + warnings = [] + show_parsed_xml_on_errors = False + registry = Registry( + id=registry_id, + name="UK National Archives PRONOM Technical Registry", + url="https://www.nationalarchives.gov.uk/PRONOM/", + id_prefix='https://www.nationalarchives.gov.uk/PRONOM/', + index_data_url=f"https://github.com/digipres/{source_folder}" + ) + + def _date_parser(self, pronom_date): + # PRONOM uses '11 Apr 2024' format so this needs parsing here: + date = datetime.datetime.strptime(pronom_date, "%d %b %Y") + return date + + def get_formats(self): + for source_folder_name in ['fmt', 'x-fmt']: + source_folder = os.path.join(self.source_folder, source_folder_name) + + for filename in os.listdir(source_folder): + if filename.endswith(".xml"): + logger.debug(f"Parsing {filename}...") + with open(f"{source_folder}/{filename}", "rb") as f: + xml = f.read() + root = None + try: + # Alternative code that was more difficult to work with: + #parser = etree.XMLParser() + #root = etree.parse(BytesIO(xml), parser) + root = BeautifulSoup(xml, "xml") + ffd_id = f"pronom:{source_folder_name}/{filename[0:-4]}" + f_name = root.find('FormatName').text + # Genres: + f_types = root.find('FormatTypes').text.strip().split(',') + if( len(f_types) == 0 ): + f_types = [""] + # Strip whitespace from genres: + f_types = [g.strip() for g in f_types] + # Replace empty strings with "Undefined": + f_types = ['undefined' if not g else g for g in f_types] + # Internal signatures: + if root.find('InternalSignature'): + f_magic = True + else: + f_magic = False + # Get extensions: + extensions = list() + for fe in root.findAll('ExternalSignature'): + if fe.find('SignatureType', string='File extension'): + ext = fe.find('Signature').text + extensions.append(ext) + f_extensions = extensions + # Get MIME types: + mimetypes = list() + for ffi in root.findAll('FileFormatIdentifier'): + if ffi.find('IdentifierType', string='MIME'): + mt = ffi.find('Identifier').text + mimetypes.append(mt) + f_mimetypes = mimetypes + # Release date as year (source format is '24 Dec 1999' if not empty/whitespace): + release_year = root.find("ReleaseDate").text.strip() + if release_year: + release_year = datetime.datetime.strptime(release_year, "%d %b %Y") + release_year = str(release_year.year) + # Create record: + f = Format( + registry_id=self.registry_id, + id=ffd_id, + name=f_name, + version=root.find("FormatVersion").text, + summary=root.find("FormatDescription").text, + genres=f_types, + extensions=f_extensions, + media_types=f_mimetypes, + has_magic=f_magic, + primary_media_type=None, + parent_media_type=None, + released_in=release_year, + registry_url=f"https://www.nationalarchives.gov.uk/pronom/{ffd_id}", + registry_source_data_url=f"https://www.nationalarchives.gov.uk/pronom/{ffd_id}.xml", + registry_index_data_url=f"https://github.com/digipres/digipres.github.io/blob/master/_sources/registries/pronom/{ffd_id}.xml", + #additional_fields= None, + created=self._date_parser(root.find('ProvenanceSourceDate').text), + last_modified=self._date_parser(root.find('LastUpdatedDate').text), + ) + yield f + except Exception as e: + logger.exception(f"Parsing {filename} failed", e) + self.warnings.append(f"Error when parsing XML from '{filename}': {e}") + # Emit extra debug info if possible: + if root and self.show_parsed_xml_on_errors: + logger.error("XML parsed as:") + logger.error(root.prettify()) + #print(etree.tostring(root, pretty_print=True).decode('utf-8')) + break + + +if __name__ == "__main__": + gen = PRONOM() + gen.show_parsed_xml_on_errors = True + for f in gen.get_formats(): + print(f.model_dump_json()) \ No newline at end of file diff --git a/foreging/tcdb.py b/foreging/tcdb.py new file mode 100644 index 0000000..191bf43 --- /dev/null +++ b/foreging/tcdb.py @@ -0,0 +1,93 @@ +import os +import csv +import logging +from .models import Software, Format, Registry, RegistryClient + +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger(__name__) + +# +# +# TCDB CSV dump parser +# +class TCDB(RegistryClient): + registry_id = "tcdb" + registry_url = f"https://github.com/thorsted/Born-Digital-Scripts/tree/main/TC%20Identification" + source_file = 'digipres.github.io/_sources/registries/tcdb/TCDB_2003.8_data-cleaned.csv' + registry = Registry( + id=registry_id, + name="Macintosh Type/Creator Codes Database", + url=registry_url, + id_prefix=None, + index_data_url=source_file + ) + + def get_formats(self): + # First, gather rows by type_code... + rows_by_type_code = {} + # Open, coping with Unicode BOM + line = 1 + with open(self.source_file, "r", encoding='utf-8-sig') as csv_file: + reader = csv.DictReader(csv_file) + for row in reader: + logger.debug(f"Processing row: {row}") + type_code = row['Type'].strip() + rows_by_type_code[type_code] = rows_by_type_code.get(type_code, []) + rows_by_type_code[type_code].append(row) + line += 1 + row['_line_number'] = line + + # Now, process each type_code: + sws = {} + for type_code, rows in rows_by_type_code.items(): + readers = [] + extensions = set() + categories = set() + names = [] + for row in rows: + logger.debug(f"Processing row: {row}") + creator_code = row['Creator'].strip() + # + ext = row['Extension'].strip().lower() + if ext: + extensions.add(ext) + # + cat = row['Category'].strip() + if cat: + categories.add(cat) + # + names.append(row['File Name'].strip()) + # Record the Software ID, adding a line number to make sure everything has distinct IDs. + sw_id = f"tcdb:{type_code}:{creator_code}#L{row['_line_number']}" + sws[sw_id] = sws.get(sw_id, + Software( + registry=self.registry, + id=sw_id, + name=row['Comments'].strip(), # Software name usually stored in the Comments field. + version=None, + summary=row['File Name'].strip() + ) + ) + readers.append(sws[sw_id]) + # Set up as a format entity for this type_code: + f = Format( + registry_id=self.registry_id, + id=f"tcdb:{type_code}", + name= ", ".join(names)[:256], # FIXME Limit size as this includes too much software information and is very slow to work with! + version=None, + summary=None, + genres=list(categories), + extensions=list(extensions), + media_types=[], + has_magic=False, + primary_media_type=None, + parent_media_type=None, + registry_url=None, + registry_source_data_url=None, + registry_index_data_url=None, + created=None, + last_modified=None, + readers=readers + ) + logger.debug(f"Generated format: {f}") + yield f diff --git a/foreging/tffh.py b/foreging/tffh.py new file mode 100644 index 0000000..9ea4b92 --- /dev/null +++ b/foreging/tffh.py @@ -0,0 +1,59 @@ +import csv +import logging +from .models import Format, Registry, RegistryClient + +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger(__name__) + + +# +class TFFH(RegistryClient): + source_file = "digipres.github.io/_sources/registries/tffh/tffh.csv" + index_url = "https://github.com/digipres/digipres.github.io/blob/master/_sources/registries/tffh/tffh.csv" + # Set up the Registry object for this class: + registry_id = "tffh" + registry = Registry( + id=registry_id, + name="The File Formats Handbook (1995)", + url="https://github.com/digipres/digipres.github.io/blob/master/_sources/registries/tffh/README.md", + id_prefix=None, + index_data_url=index_url + ) + + def get_formats(self): + fmts = [] + idx = 1 + with open(self.source_file, "r") as f: + reader = csv.DictReader(f) + for row in reader: + idx += 1 + primary_exts = row['Primary File'].strip() + if primary_exts: + # Assemble all entries, skipping empties: + extensions = (primary_exts + " " + row["Secondary Files"].strip()).split(" ") + extensions = [e for e in extensions if e] + # Set up as a format entity: + f = Format( + registry_id=self.registry_id, + id=f"{self.registry_id}:{idx}", + name=row["Name"].strip(), + version=None, + summary=None, + genres=[row["Genre"].strip()], + extensions=extensions, + media_types=[], + has_magic=True, + primary_media_type=None, + parent_media_type=None, + registry_url=None, + registry_source_data_url=None, + registry_index_data_url=f"{self.index_url}#L{idx}", + created=None, + last_modified=None + ) + # And record the entry: + fmts.append(f) + + # Now yield them, so all the log entries get stored too: + for f in fmts: + yield f \ No newline at end of file diff --git a/foreging/tika.py b/foreging/tika.py new file mode 100644 index 0000000..06dc9dc --- /dev/null +++ b/foreging/tika.py @@ -0,0 +1,121 @@ +import json +import yaml +from lxml import etree +from io import BytesIO +import logging +from .models import Format, Registry, RegistryDataLogEntry, RegistryClient + +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger(__name__) + + +# +class Tika(RegistryClient): + source_file = "digipres.github.io/_sources/registries/tika/tika-mimetypes.xml" + source_url = "https://svn.apache.org/repos/asf/tika/trunk/tika-core/src/main/resources/org/apache/tika/mime/tika-mimetypes.xml" + index_url = "https://github.com/digipres/digipres.github.io/blob/master/_sources/registries/tika/tika-mimetypes.xml" + # Set up the Registry object for this class: + registry_id = "tika" + registry = Registry( + id=registry_id, + name="Apache Tika", + url="https://tika.apache.org/", + id_prefix=None, + index_data_url=index_url + ) + + def get_formats(self): + fmts = [] + with open(self.source_file, "rb") as f: + xml = f.read() + log = [] + try: + parser = etree.XMLParser() + root = etree.parse(BytesIO(xml), parser) + except Exception as e: + log.append( + RegistryDataLogEntry( + level='warning', + message="Error when parsing XML: "+str(e) + ) + ) + parser = etree.XMLParser(recover=True) + root = etree.parse(BytesIO(xml), parser) + for ff in root.findall('mime-type'): + finfo = {} + fid = ff.get('type') + finfo['id'] = fid + finfo['source'] = f"#L{ff.sourceline}" + # Build the name: + if ff.find('_comment') is not None: + finfo['name'] = ff.find('_comment').text + # Has Magic? + if ff.find('magic') is not None: + finfo['hasMagic'] = True + else: + finfo['hasMagic'] = False + # Look for extensions: + extensions = list() + for ext in ff.findall('glob'): + extension = ext.get('pattern').replace('*.','') # Strip the glob + extensions.append(extension) + finfo['extensions'] = extensions + # Look for MIME Types: + mimetypes = list() + mimetypes.append(fid) + if ff.find('alias') is not None: + for alias in ff.findall('alias'): + mt = alias.get('type') + if mt: + if mt not in mimetypes: + mimetypes.append(mt) + else: + log.append( + RegistryDataLogEntry( + level='warning', + message="Duplicate MIME type %s for type %s." % (alias.get('type'), fid) + )) + # TODO Spot duplicate aliases. + finfo['mimetypes'] = mimetypes + # Relationships: + if ff.find('sub-class-of') is not None: + finfo['supertype'] = ff.find('sub-class-of').get('type') + if finfo['supertype'] == fid: + log.append( + RegistryDataLogEntry( + level="warning", + message="Format %s has itself as a supertype!" % fid + )) + #addFormat(rid,fid,finfo) + # Also record the XML error, if there was one: + self.registry.data_log.extend(log) + + # Post-process mimetypes: + media_types = finfo['mimetypes'] + parent = finfo.get('supertype', None) + + # Set up as a format entity: + f = Format( + registry_id=self.registry_id, + id=f"{self.registry_id}:{fid}", + name=finfo.get('name', None), + version=None, + summary=None, + genres=[], + extensions=list(finfo['extensions']), + media_types=media_types, + has_magic=finfo['hasMagic'], + primary_media_type=media_types[0], + parent_media_type=parent, + registry_url=None, + registry_source_data_url=self.source_url + finfo['source'], + registry_index_data_url=self.index_url + finfo['source'], + created=None, + last_modified=None + ) + # And record the entry: + fmts.append(f) + + # Now yield them, so all the log entries get stored too: + for f in fmts: + yield f \ No newline at end of file diff --git a/foreging/trid.py b/foreging/trid.py new file mode 100644 index 0000000..f0732b3 --- /dev/null +++ b/foreging/trid.py @@ -0,0 +1,89 @@ +import json +import yaml +import os +from lxml import etree +from io import BytesIO +import logging +from .models import Format, Registry, RegistryClient + +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger(__name__) + + +# +class TrID(RegistryClient): + source_dir = "digipres.github.io/_sources/registries/trid" + source_url = "" + index_url = "https://github.com/digipres/digipres.github.io/tree/master/_sources/registries/trid/triddefs_xml/" + # Set up the Registry object for this class: + registry_id = "trid" + registry = Registry( + id=registry_id, + name="TrID - File Identifier", + url="https://www.mark0.net/soft-trid-e.html", + id_prefix=None, + index_data_url=None + ) + + fmts = [] + + def add_format(self, fid, finfo): + media_types = [] + for mt in finfo['mimetypes']: + media_types.append(mt) + extensions = [] + for ext in finfo['extensions']: + extensions.append(ext) + # Set up as a format entity: + f = Format( + registry_id=self.registry_id, + id=f"{self.registry_id}:{fid}", + name=finfo.get('name', None), + version=None, + summary=None, + genres=[], + extensions=extensions, + media_types=media_types, + has_magic=finfo['hasMagic'], + primary_media_type=None, + parent_media_type=None, + registry_url=None, + registry_source_data_url=self.source_url + finfo['source'], + registry_index_data_url=self.index_url + finfo['source'], + created=None, + last_modified=None + ) + # And record the entry: + self.fmts.append(f) + + + + def get_formats(self): + for filename in os.listdir(f'{self.source_dir}/triddefs_xml'): + if filename.endswith(".trid.xml"): + # Get Identifier? + with open(f'{self.source_dir}/triddefs_xml/'+filename, "r") as f: + finfo = {} + finfo['source'] = filename + root = etree.parse(f) + fid = filename[:-9] + finfo['name'] = root.findall('Info/FileType')[0].text + if root.find('FrontBlock') is not None: + finfo['hasMagic'] = True + else: + finfo['hasMagic'] = False + # Get extensions: + extensions = list() + for fe in root.findall('Info/Ext'): + if(fe.text != None): + for ext in fe.text.split("/"): + if ext not in extensions: + extensions.append("%s" % ext.lower()) + finfo['extensions'] = extensions + # Get MIME types: + finfo['mimetypes'] = list() + self.add_format(fid, finfo) + + # Now yield them, so all the log entries get stored too: + for f in self.fmts: + yield f \ No newline at end of file diff --git a/foreging/wikidata.py b/foreging/wikidata.py new file mode 100644 index 0000000..b076573 --- /dev/null +++ b/foreging/wikidata.py @@ -0,0 +1,156 @@ +import json +import logging +from .models import Software, Format, Registry, RegistryDataLogEntry, RegistryClient + +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger(__name__) + + +# +# WikiData dumps parser +# +class WikiData(RegistryClient): + source_file_dir = "digipres.github.io/_sources/registries/wikidata" + fmt_source_file = f"{source_file_dir}/wikidata.json" + sw_r_source_file = f"{source_file_dir}/wikidata-reads.json" + sw_w_source_file = f"{source_file_dir}/wikidata-writes.json" + + # Set up the Registry object for this class: + registry_id = "wikidata" + registry = Registry( + id=registry_id, + name="WikiData", + url="https://www.wikidata.org/wiki/Wikidata:WikiProject_Informatics/Structures/File_formats", + id_prefix='http://www.wikidata.org/entity/', + index_data_url=f"https://github.com/digipres/digipres.github.io/blob/master/{source_file_dir}" + ) + + + def get_formats(self): + + with open (self.fmt_source_file, 'r') as f: + wd = json.load(f) + + fmts = {} + warnings = set() + + current_qid = None + + for fmt in wd: + qid = f"wikidata:{fmt['id']}" + # items are ordered by ID, so we can aggregate as we go + if qid != current_qid: + # Store the previous record: + if current_qid: + fmts[current_qid] = finfo + current_qid = qid + # Start a new record: + finfo = {} + finfo['name'] = fmt['name'] + finfo['source'] = fmt['source'] + finfo['extensions'] = set() + finfo['mimetypes'] = set() + finfo['hasMagic'] = False + finfo['readers'] = set() + finfo['writers'] = set() + # Aggregate value for each ID + for key in fmt: + if key == 'extension' and fmt[key]: + # Making sure we reuse the same object for an extension to keep the model consistent: + ext = fmt[key] + finfo['extensions'].add(ext) + if key == 'mimetype' and fmt[key]: + mt = fmt[key] + finfo['mimetypes'].add(mt) + if key == 'sig' and fmt[key]: + finfo['hasMagic'] = True + + # Add the final one: + if current_qid: + fmts[current_qid] = finfo + + # Now get the software: + + # Load the 'what reads this' and 'what writes this' data: + with open (self.sw_r_source_file, 'r') as f: + sw_r = json.load(f) + with open (self.sw_w_source_file, 'r') as f: + sw_w = json.load(f) + + # Process the software data: + sws = {} + for mode, sw_i in [('reads', sw_r), ('writes', sw_w)]: + for sw in sw_i: + qid = sw['format'].replace("http://www.wikidata.org/entity/","wikidata:") + sw_qid = sw['id'] + # Check it's in the set: + if qid not in fmts: + warning = f"Software entry '{sw_qid}: {sw['formatLabel']}' references missing format '{qid}'" + logger.debug( warning ) + warnings.add( RegistryDataLogEntry(level="warning", message=warning, url=sw['source'] ) ) + continue + if sw_qid not in sws: + sws[sw_qid] = sw + sws[sw_qid]['reads'] = [] + sws[sw_qid]['writes'] = [] + sws[sw_qid][mode].append(qid) + + # Now add the software to the formats: + for sw in sws.values(): + s = self.make_software(sw) + for qid in sw['reads']: + fmts[qid]['readers'].add(s) + for qid in sw['writes']: + fmts[qid]['writers'].add(s) + + # Store the warnings: + self.registry.data_log = list(warnings) + + # And return the format: + for qid in fmts: + info = fmts[qid] + yield self.make_format(qid,info) + + + def make_format(self, current_qid, finfo): + + # Set up as a format entity: + f = Format( + id=f"{current_qid}", + registry_id=self.registry_id, + name=finfo['name'], + version=None, + summary=None, + genres= [], + extensions=list(finfo['extensions']), + media_types=list(finfo['mimetypes']), + has_magic=finfo['hasMagic'], + primary_media_type=None, + parent_media_type=None, + registry_url=finfo['source'], + registry_source_data_url=f"{finfo['source']}.jsonld", + registry_index_data_url=None, + #additional_fields={}, + created=None, + last_modified=None, + readers=list(finfo['readers']), + writers=list(finfo['writers']) + ) + logger.debug(f"Generated format: {f}") + return f + + + def make_software(self, info): + s = Software( + registry_id=self.registry_id, + id=f"wikidata:{info['id']}", + name=info['name'], + version=None, + summary=None, + registry_url=info['source'], + license=info['licenseLabel'], + ) + logger.debug(f"Generated software: {s}") + return s + + diff --git a/pyproject.toml b/pyproject.toml new file mode 100644 index 0000000..2bc7329 --- /dev/null +++ b/pyproject.toml @@ -0,0 +1,20 @@ +[project] +name = "sentinel" +version = "2.0.0" +dependencies = [ + "requests", + "pyyaml", + "beautifulsoup4", + "lxml", + "mwclient", + "mwparserfromhell", + "rdflib", + "polyfile", + "pyarrow", + "pydantic", + "sqlite-utils", + "sqlmodel" +] + +[tool.setuptools.packages.find] +include = ["foreging"] diff --git a/run-in-datasette.sh b/run-in-datasette.sh new file mode 100755 index 0000000..59b72b1 --- /dev/null +++ b/run-in-datasette.sh @@ -0,0 +1,2 @@ +#!/bin/sh +uvx datasette digipres.github.io/_data/formats/registries.db --config facet_time_limit_ms:1000 --config default_facet_size:15 diff --git a/setup.sh b/setup.sh index 682c713..131a363 100755 --- a/setup.sh +++ b/setup.sh @@ -15,10 +15,7 @@ cd pywikibot pip install . cd .. -pip install requests -pip install pyyaml -pip install beautifulsoup4 -pip install lxml +pip install . # Running... echo "And login..."