-
Notifications
You must be signed in to change notification settings - Fork 30
Add initial support for crypto callbacks. #114
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from 1 commit
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,53 @@ | ||
| # test_cryptocb.py | ||
| # | ||
| # Copyright (C) 2026 wolfSSL Inc. | ||
| # | ||
| # This file is part of wolfSSL. (formerly known as CyaSSL) | ||
| # | ||
| # wolfSSL is free software; you can redistribute it and/or modify | ||
| # it under the terms of the GNU General Public License as published by | ||
| # the Free Software Foundation; either version 2 of the License, or | ||
| # (at your option) any later version. | ||
| # | ||
| # wolfSSL is distributed in the hope that it will be useful, | ||
| # but WITHOUT ANY WARRANTY; without even the implied warranty of | ||
| # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | ||
| # GNU General Public License for more details. | ||
| # | ||
| # You should have received a copy of the GNU General Public License | ||
| # along with this program; if not, write to the Free Software | ||
| # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA | ||
|
|
||
| import pytest | ||
|
|
||
| from wolfcrypt._ffi import lib as _lib | ||
| from wolfcrypt.random import Random | ||
|
|
||
|
|
||
| if not _lib.CRYPTO_CB_ENABLED: | ||
| pytest.skip("Crypto Callbacks not supported", allow_module_level=True) | ||
|
|
||
| from wolfcrypt.cryptocb import CryptoCallback | ||
|
|
||
|
|
||
| def test_default_device_id(): | ||
| print(f"Default device ID = {CryptoCallback.default_device_id()}") | ||
|
|
||
| class RngCryptoCallback(CryptoCallback): | ||
| def rng_callback(self, _device_id: int, _rng, size: int) -> bytes: | ||
| # Generate fake random data for testing purposes. | ||
| return bytes(range(1, 1 + size)) | ||
|
|
||
|
|
||
| def test_rng_callback(): | ||
| with RngCryptoCallback(10): | ||
| rng = Random(device_id=10) | ||
|
|
||
| random = rng.byte() | ||
| assert random == b"\01" | ||
|
|
||
| random = rng.bytes(1) | ||
| assert random == b"\01" | ||
|
|
||
| random = rng.bytes(3) | ||
| assert random == b"\01\02\03" |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,122 @@ | ||
| # cryptocb.py | ||
| # | ||
| # Copyright (C) 2026 wolfSSL Inc. | ||
| # | ||
| # This file is part of wolfSSL. (formerly known as CyaSSL) | ||
| # | ||
| # wolfSSL is free software; you can redistribute it and/or modify | ||
| # it under the terms of the GNU General Public License as published by | ||
| # the Free Software Foundation; either version 2 of the License, or | ||
| # (at your option) any later version. | ||
| # | ||
| # wolfSSL is distributed in the hope that it will be useful, | ||
| # but WITHOUT ANY WARRANTY; without even the implied warranty of | ||
| # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | ||
| # GNU General Public License for more details. | ||
| # | ||
| # You should have received a copy of the GNU General Public License | ||
| # along with this program; if not, write to the Free Software | ||
| # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA | ||
|
|
||
| # pylint: disable=no-member,no-name-in-module | ||
|
|
||
| from __future__ import annotations | ||
|
|
||
| from typing import Final | ||
| from typing_extensions import Self | ||
|
|
||
| from wolfcrypt._ffi import ffi as _ffi | ||
| from wolfcrypt._ffi import lib as _lib | ||
|
|
||
| from wolfcrypt.exceptions import WolfCryptError | ||
|
|
||
| ALGO_TYPE_NAME: Final = { | ||
| _lib.WC_ALGO_TYPE_HASH: "hash", | ||
| _lib.WC_ALGO_TYPE_CIPHER: "cipher", | ||
| _lib.WC_ALGO_TYPE_RNG: "rng", | ||
| _lib.WC_ALGO_TYPE_SEED: "seed", | ||
| } | ||
|
|
||
| HASH_TYPE_NAME: Final = { | ||
| _lib.WC_HASH_TYPE_SHA: "SHA1", | ||
| _lib.WC_HASH_TYPE_SHA256: "SHA256", | ||
| _lib.WC_HASH_TYPE_SHA384: "SHA384", | ||
| _lib.WC_HASH_TYPE_SHA512: "SHA512", | ||
| _lib.WC_HASH_TYPE_SHA3_256: "SHA3_256", | ||
| _lib.WC_HASH_TYPE_SHA3_384: "SHA3_384", | ||
| _lib.WC_HASH_TYPE_SHA3_512: "SHA3_512", | ||
| } | ||
|
|
||
| DIGEST_SIZE: Final = { | ||
| _lib.WC_HASH_TYPE_SHA: 20, | ||
| _lib.WC_HASH_TYPE_SHA256: 32, | ||
| _lib.WC_HASH_TYPE_SHA384: 48, | ||
| _lib.WC_HASH_TYPE_SHA512: 64, | ||
| _lib.WC_HASH_TYPE_SHA3_256: 32, | ||
| _lib.WC_HASH_TYPE_SHA3_384: 48, | ||
| _lib.WC_HASH_TYPE_SHA3_512: 64, | ||
| } | ||
|
|
||
|
|
||
| if _lib.CRYPTO_CB_ENABLED: | ||
| class CryptoCallback: | ||
| def __init__(self, device_id: int): | ||
| self.device_id = device_id | ||
| self.ctx = _ffi.new_handle(self) | ||
| ret = _lib.wc_CryptoCb_RegisterDevice(device_id, _lib.py_wc_crypto_callback, self.ctx) | ||
| if ret < 0: # pragma: no cover | ||
| raise WolfCryptError(f"CryptoCb device registration error ({ret})") | ||
|
|
||
| def __enter__(self) -> Self: | ||
| return self | ||
|
|
||
| def __exit__(self, exc_type, exc_value, traceback) -> None: | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🔴 [High] Double unregister of device on context-manager exit + GC Both def __exit__(self, exc_type, exc_value, traceback) -> None:
self._unregister()
def __del__(self) -> None:
self._unregister()
def _unregister(self) -> None:
_lib.wc_CryptoCb_UnRegisterDevice(self.device_id)Recommendation: Track registration state with a
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The tracking of the registration is done in the wolfcrypt library itself. Unregistering a device twice is a noop. So the unregister in
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. wc_CryptoCb_UnRegisterDevice looks up by devId only — no ownership check (cryptocb.c:428). The double-unregister is only a no-op if the slot is still INVALID_DEVID at the second call. In the cross-instance case (cb1 exits → cb2 registers same devId → cb1 GC'd), cb1's del will find cb2's live slot and ClearDev it. A _registered flag in del avoids this without removing exit's unregister. |
||
| self._unregister() | ||
|
|
||
| def __del__(self) -> None: | ||
| self._unregister() | ||
|
|
||
| def callback(self, device_id: int, info: _ffi.CData) -> int: | ||
| print(f"{device_id=} algo = {ALGO_TYPE_NAME[info.algo_type]}") | ||
|
JeremiahM37 marked this conversation as resolved.
Outdated
|
||
| # _lib.wc_CryptoCb_InfoString(info) | ||
| try: | ||
| if info.algo_type == _lib.WC_ALGO_TYPE_HASH: | ||
|
JeremiahM37 marked this conversation as resolved.
|
||
| print(f"hash = {HASH_TYPE_NAME[info.hash.type]}") | ||
| print(f"{info.hash.data=} {info.hash.data_size=} {info.hash.digest=} {info.hash.u.sha256=}") | ||
| if info.hash.digest == _ffi.NULL: | ||
| self.hash_update_callback(device_id, info.hash.type, bytes(_ffi.buffer(info.hash.data, info.hash.data_size))) | ||
| else: | ||
| digest = self.hash_finalize_callback(device_id, info.hash.type) | ||
| _ffi.buffer(info.hash.digest, DIGEST_SIZE[info.hash.type])[:] = digest | ||
|
JeremiahM37 marked this conversation as resolved.
|
||
| return 0 | ||
| if info.algo_type == _lib.WC_ALGO_TYPE_CIPHER: | ||
| self.cipher_callback(device_id) | ||
| return 0 | ||
| if info.algo_type == _lib.WC_ALGO_TYPE_RNG: | ||
| out = self.rng_callback(device_id, info.rng.rng, info.rng.sz) | ||
| _ffi.buffer(info.rng.out, info.rng.sz)[:] = out | ||
|
JeremiahM37 marked this conversation as resolved.
|
||
| return 0 | ||
| return _lib.CRYPTOCB_UNAVAILABLE | ||
| except NotImplementedError: | ||
| return _lib.CRYPTOCB_UNAVAILABLE | ||
|
|
||
| def rng_callback(self, device_id: int, rng, size: int) -> bytes: | ||
| raise NotImplementedError | ||
|
|
||
| def hash_update_callback(self, device_id: int, hash_type: int, data: bytes) -> None: | ||
| print("hash_update_callback") | ||
| raise NotImplementedError | ||
|
|
||
| def hash_finalize_callback(self, device_id: int, hash_type: int) -> bytes: | ||
| print("hash_finalize_callback") | ||
| raise NotImplementedError | ||
|
|
||
| def cipher_callback(self, device_id: int) -> None: | ||
| raise NotImplementedError | ||
|
|
||
| def _unregister(self) -> None: | ||
| _lib.wc_CryptoCb_UnRegisterDevice(self.device_id) | ||
|
|
||
| @classmethod | ||
| def default_device_id(cls) -> int: | ||
| return _lib.wc_CryptoCb_DefaultDevID() | ||
Uh oh!
There was an error while loading. Please reload this page.