mirror of
https://gitlab.gnome.org/GNOME/libsecret.git
synced 2024-12-22 12:48:51 +00:00
mock: Port to Python 3
https://bugzilla.gnome.org/show_bug.cgi?id=761834
This commit is contained in:
parent
be5f98bbdb
commit
847dd055e4
@ -88,7 +88,7 @@ service_start (const gchar *mock_script,
|
|||||||
gint output;
|
gint output;
|
||||||
|
|
||||||
gchar *argv[] = {
|
gchar *argv[] = {
|
||||||
"python", (gchar *)mock_script,
|
"python3", (gchar *)mock_script,
|
||||||
NULL
|
NULL
|
||||||
};
|
};
|
||||||
|
|
||||||
|
@ -9,4 +9,5 @@
|
|||||||
# See the included COPYING file for more information.
|
# See the included COPYING file for more information.
|
||||||
#
|
#
|
||||||
|
|
||||||
from service import *
|
from .service import SecretItem, SecretCollection, SecretService, SecretPrompt
|
||||||
|
from .service import PlainAlgorithm, NotSupported
|
||||||
|
@ -18,13 +18,13 @@ import math
|
|||||||
def append_PKCS7_padding(s):
|
def append_PKCS7_padding(s):
|
||||||
"""return s padded to a multiple of 16-bytes by PKCS7 padding"""
|
"""return s padded to a multiple of 16-bytes by PKCS7 padding"""
|
||||||
numpads = 16 - (len(s)%16)
|
numpads = 16 - (len(s)%16)
|
||||||
return s + numpads*chr(numpads)
|
return s + bytes([numpads] * numpads)
|
||||||
|
|
||||||
def strip_PKCS7_padding(s):
|
def strip_PKCS7_padding(s):
|
||||||
"""return s stripped of PKCS7 padding"""
|
"""return s stripped of PKCS7 padding"""
|
||||||
if len(s)%16 or not s:
|
if len(s)%16 or not s:
|
||||||
raise ValueError("String of len %d can't be PCKS7-padded" % len(s))
|
raise ValueError("String of len %d can't be PCKS7-padded" % len(s))
|
||||||
numpads = ord(s[-1])
|
numpads = s[-1]
|
||||||
if numpads > 16:
|
if numpads > 16:
|
||||||
raise ValueError("String ending with %r can't be PCKS7-padded" % s[-1])
|
raise ValueError("String ending with %r can't be PCKS7-padded" % s[-1])
|
||||||
return s[:-numpads]
|
return s[:-numpads]
|
||||||
@ -421,7 +421,7 @@ class AESModeOfOperation(object):
|
|||||||
while len(ar) < end - start:
|
while len(ar) < end - start:
|
||||||
ar.append(0)
|
ar.append(0)
|
||||||
while i < end:
|
while i < end:
|
||||||
ar[j] = ord(string[i])
|
ar[j] = string[i]
|
||||||
j += 1
|
j += 1
|
||||||
i += 1
|
i += 1
|
||||||
return ar
|
return ar
|
||||||
@ -523,7 +523,7 @@ class AESModeOfOperation(object):
|
|||||||
output = []
|
output = []
|
||||||
plaintext = [0] * 16
|
plaintext = [0] * 16
|
||||||
# the output plain text string
|
# the output plain text string
|
||||||
stringOut = ''
|
result = []
|
||||||
# char firstRound
|
# char firstRound
|
||||||
firstRound = True
|
firstRound = True
|
||||||
if cipherIn != None:
|
if cipherIn != None:
|
||||||
@ -549,7 +549,7 @@ class AESModeOfOperation(object):
|
|||||||
else:
|
else:
|
||||||
plaintext[i] = output[i] ^ ciphertext[i]
|
plaintext[i] = output[i] ^ ciphertext[i]
|
||||||
for k in range(end-start):
|
for k in range(end-start):
|
||||||
stringOut += chr(plaintext[k])
|
result.append(plaintext[k])
|
||||||
iput = ciphertext
|
iput = ciphertext
|
||||||
elif mode == self.modeOfOperation["OFB"]:
|
elif mode == self.modeOfOperation["OFB"]:
|
||||||
if firstRound:
|
if firstRound:
|
||||||
@ -567,7 +567,7 @@ class AESModeOfOperation(object):
|
|||||||
else:
|
else:
|
||||||
plaintext[i] = output[i] ^ ciphertext[i]
|
plaintext[i] = output[i] ^ ciphertext[i]
|
||||||
for k in range(end-start):
|
for k in range(end-start):
|
||||||
stringOut += chr(plaintext[k])
|
result.append(plaintext[k])
|
||||||
iput = output
|
iput = output
|
||||||
elif mode == self.modeOfOperation["CBC"]:
|
elif mode == self.modeOfOperation["CBC"]:
|
||||||
output = self.aes.decrypt(ciphertext, key, size)
|
output = self.aes.decrypt(ciphertext, key, size)
|
||||||
@ -579,12 +579,12 @@ class AESModeOfOperation(object):
|
|||||||
firstRound = False
|
firstRound = False
|
||||||
if originalsize is not None and originalsize < end:
|
if originalsize is not None and originalsize < end:
|
||||||
for k in range(originalsize-start):
|
for k in range(originalsize-start):
|
||||||
stringOut += chr(plaintext[k])
|
result.append(plaintext[k])
|
||||||
else:
|
else:
|
||||||
for k in range(end-start):
|
for k in range(end-start):
|
||||||
stringOut += chr(plaintext[k])
|
result.append(plaintext[k])
|
||||||
iput = ciphertext
|
iput = ciphertext
|
||||||
return stringOut
|
return bytes(result)
|
||||||
|
|
||||||
|
|
||||||
def encryptData(key, data, mode=AESModeOfOperation.modeOfOperation["CBC"]):
|
def encryptData(key, data, mode=AESModeOfOperation.modeOfOperation["CBC"]):
|
||||||
@ -640,7 +640,7 @@ def generateRandomKey(keysize):
|
|||||||
"""
|
"""
|
||||||
if keysize not in (16, 24, 32):
|
if keysize not in (16, 24, 32):
|
||||||
emsg = 'Invalid keysize, %s. Should be one of (16, 24, 32).'
|
emsg = 'Invalid keysize, %s. Should be one of (16, 24, 32).'
|
||||||
raise ValueError, emsg % keysize
|
raise ValueError(emsg % keysize)
|
||||||
return os.urandom(keysize)
|
return os.urandom(keysize)
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
@ -650,7 +650,7 @@ if __name__ == "__main__":
|
|||||||
iv = [103,35,148,239,76,213,47,118,255,222,123,176,106,134,98,92]
|
iv = [103,35,148,239,76,213,47,118,255,222,123,176,106,134,98,92]
|
||||||
mode, orig_len, ciph = moo.encrypt(cleartext, moo.modeOfOperation["CBC"],
|
mode, orig_len, ciph = moo.encrypt(cleartext, moo.modeOfOperation["CBC"],
|
||||||
cypherkey, moo.aes.keySize["SIZE_128"], iv)
|
cypherkey, moo.aes.keySize["SIZE_128"], iv)
|
||||||
print 'm=%s, ol=%s (%s), ciph=%s' % (mode, orig_len, len(cleartext), ciph)
|
print('m=%s, ol=%s (%s), ciph=%s' % (mode, orig_len, len(cleartext), ciph))
|
||||||
decr = moo.decrypt(ciph, orig_len, mode, cypherkey,
|
decr = moo.decrypt(ciph, orig_len, mode, cypherkey,
|
||||||
moo.aes.keySize["SIZE_128"], iv)
|
moo.aes.keySize["SIZE_128"], iv)
|
||||||
print decr
|
print(decr)
|
||||||
|
@ -22,52 +22,26 @@
|
|||||||
import math
|
import math
|
||||||
import random
|
import random
|
||||||
|
|
||||||
PRIME = '\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xC9\x0F\xDA\xA2\x21\x68\xC2\x34\xC4\xC6\x62\x8B\x80\xDC\x1C\xD1' \
|
PRIME = b'\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xC9\x0F\xDA\xA2\x21\x68\xC2\x34\xC4\xC6\x62\x8B\x80\xDC\x1C\xD1' \
|
||||||
'\x29\x02\x4E\x08\x8A\x67\xCC\x74\x02\x0B\xBE\xA6\x3B\x13\x9B\x22\x51\x4A\x08\x79\x8E\x34\x04\xDD' \
|
b'\x29\x02\x4E\x08\x8A\x67\xCC\x74\x02\x0B\xBE\xA6\x3B\x13\x9B\x22\x51\x4A\x08\x79\x8E\x34\x04\xDD' \
|
||||||
'\xEF\x95\x19\xB3\xCD\x3A\x43\x1B\x30\x2B\x0A\x6D\xF2\x5F\x14\x37\x4F\xE1\x35\x6D\x6D\x51\xC2\x45' \
|
b'\xEF\x95\x19\xB3\xCD\x3A\x43\x1B\x30\x2B\x0A\x6D\xF2\x5F\x14\x37\x4F\xE1\x35\x6D\x6D\x51\xC2\x45' \
|
||||||
'\xE4\x85\xB5\x76\x62\x5E\x7E\xC6\xF4\x4C\x42\xE9\xA6\x37\xED\x6B\x0B\xFF\x5C\xB6\xF4\x06\xB7\xED' \
|
b'\xE4\x85\xB5\x76\x62\x5E\x7E\xC6\xF4\x4C\x42\xE9\xA6\x37\xED\x6B\x0B\xFF\x5C\xB6\xF4\x06\xB7\xED' \
|
||||||
'\xEE\x38\x6B\xFB\x5A\x89\x9F\xA5\xAE\x9F\x24\x11\x7C\x4B\x1F\xE6\x49\x28\x66\x51\xEC\xE6\x53\x81' \
|
b'\xEE\x38\x6B\xFB\x5A\x89\x9F\xA5\xAE\x9F\x24\x11\x7C\x4B\x1F\xE6\x49\x28\x66\x51\xEC\xE6\x53\x81' \
|
||||||
'\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF'
|
b'\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF'
|
||||||
|
|
||||||
def num_bits(number):
|
|
||||||
if number == 0:
|
|
||||||
return 0
|
|
||||||
s = "%x" % number
|
|
||||||
return ((len(s)-1)*4) + \
|
|
||||||
{'0':0, '1':1, '2':2, '3':2,
|
|
||||||
'4':3, '5':3, '6':3, '7':3,
|
|
||||||
'8':4, '9':4, 'a':4, 'b':4,
|
|
||||||
'c':4, 'd':4, 'e':4, 'f':4,
|
|
||||||
}[s[0]]
|
|
||||||
|
|
||||||
def num_bytes(number):
|
def num_bytes(number):
|
||||||
if number == 0:
|
return math.ceil(number.bit_length() / 8)
|
||||||
return 0
|
|
||||||
bits = num_bits(number)
|
|
||||||
return int(math.ceil(bits / 8.0))
|
|
||||||
|
|
||||||
def bytes_to_number(data):
|
|
||||||
number = 0L
|
|
||||||
multiplier = 1L
|
|
||||||
for count in range(len(data) - 1, -1, -1):
|
|
||||||
number += multiplier * ord(data[count])
|
|
||||||
multiplier *= 256
|
|
||||||
return number
|
|
||||||
|
|
||||||
def number_to_bytes(number):
|
def number_to_bytes(number):
|
||||||
n_data = num_bytes(number)
|
n_data = num_bytes(number)
|
||||||
data = ['' for i in range(0, n_data)]
|
return number.to_bytes(n_data, 'big')
|
||||||
for count in range(n_data - 1, -1, -1):
|
|
||||||
data[count] = chr(number % 256)
|
|
||||||
number >>= 8
|
|
||||||
return "".join(data)
|
|
||||||
|
|
||||||
def generate_pair():
|
def generate_pair():
|
||||||
prime = bytes_to_number (PRIME)
|
prime = int.from_bytes(PRIME, 'big')
|
||||||
base = 2
|
base = 2
|
||||||
# print "mock prime: ", hex(prime)
|
# print "mock prime: ", hex(prime)
|
||||||
# print " mock base: ", hex(base)
|
# print " mock base: ", hex(base)
|
||||||
bits = num_bits(prime)
|
bits = prime.bit_length()
|
||||||
privat = 0
|
privat = 0
|
||||||
while privat == 0:
|
while privat == 0:
|
||||||
privat = random.getrandbits(bits - 1)
|
privat = random.getrandbits(bits - 1)
|
||||||
@ -75,7 +49,7 @@ def generate_pair():
|
|||||||
return (privat, publi)
|
return (privat, publi)
|
||||||
|
|
||||||
def derive_key(privat, peer):
|
def derive_key(privat, peer):
|
||||||
prime = bytes_to_number (PRIME)
|
prime = int.from_bytes(PRIME, 'big')
|
||||||
key = pow(peer, privat, prime)
|
key = pow(peer, privat, prime)
|
||||||
# print " mock ikm2: ", hex(key)
|
# print " mock ikm2: ", hex(key)
|
||||||
return number_to_bytes(key)
|
return number_to_bytes(key)
|
||||||
|
@ -22,7 +22,7 @@ import math
|
|||||||
from binascii import a2b_hex, b2a_hex
|
from binascii import a2b_hex, b2a_hex
|
||||||
|
|
||||||
class HKDF(object):
|
class HKDF(object):
|
||||||
def __init__(self, ikm, L, salt=None, info="", digestmod = None):
|
def __init__(self, ikm, L, salt=None, info=None, digestmod = None):
|
||||||
self.ikm = ikm
|
self.ikm = ikm
|
||||||
self.keylen = L
|
self.keylen = L
|
||||||
|
|
||||||
@ -36,11 +36,11 @@ class HKDF(object):
|
|||||||
self.hashlen = len(self.digest_cons().digest())
|
self.hashlen = len(self.digest_cons().digest())
|
||||||
|
|
||||||
if salt is None:
|
if salt is None:
|
||||||
self.salt = chr(0)*(self.hashlen)
|
self.salt = b'\x00' * (self.hashlen)
|
||||||
else:
|
else:
|
||||||
self.salt = salt
|
self.salt = salt
|
||||||
|
|
||||||
self.info = info
|
self.info = info or b''
|
||||||
|
|
||||||
#extract PRK
|
#extract PRK
|
||||||
def extract(self):
|
def extract(self):
|
||||||
@ -51,8 +51,8 @@ class HKDF(object):
|
|||||||
#expand PRK
|
#expand PRK
|
||||||
def expand(self):
|
def expand(self):
|
||||||
N = math.ceil(float(self.keylen)/self.hashlen)
|
N = math.ceil(float(self.keylen)/self.hashlen)
|
||||||
T = ""
|
T = b""
|
||||||
temp = ""
|
temp = b""
|
||||||
i=0x01
|
i=0x01
|
||||||
'''while len(T)<2*self.keylen :
|
'''while len(T)<2*self.keylen :
|
||||||
msg = temp
|
msg = temp
|
||||||
@ -66,7 +66,7 @@ class HKDF(object):
|
|||||||
while len(T)<self.keylen :
|
while len(T)<self.keylen :
|
||||||
msg = temp
|
msg = temp
|
||||||
msg += self.info
|
msg += self.info
|
||||||
msg += chr(i)
|
msg += bytes((i,))
|
||||||
h = hmac.new(self.prk, msg, self.digest_cons)
|
h = hmac.new(self.prk, msg, self.digest_cons)
|
||||||
temp = h.digest()
|
temp = h.digest()
|
||||||
i += 1
|
i += 1
|
||||||
|
@ -17,9 +17,7 @@ import sys
|
|||||||
import time
|
import time
|
||||||
import unittest
|
import unittest
|
||||||
|
|
||||||
import aes
|
from mock import aes, dh, hkdf
|
||||||
import dh
|
|
||||||
import hkdf
|
|
||||||
|
|
||||||
import dbus
|
import dbus
|
||||||
import dbus.service
|
import dbus.service
|
||||||
@ -54,8 +52,7 @@ def next_identifier(prefix=''):
|
|||||||
return "%s%d" % (prefix, unique_identifier)
|
return "%s%d" % (prefix, unique_identifier)
|
||||||
|
|
||||||
def encode_identifier(value):
|
def encode_identifier(value):
|
||||||
return "".join([(c.isalpha() or c.isdigit()) and c or "_%02x" % ord(c) \
|
return "".join([c.isalnum() and c or "_%02x" % ord(c) for c in value])
|
||||||
for c in value.encode('utf-8')])
|
|
||||||
|
|
||||||
def hex_encode(string):
|
def hex_encode(string):
|
||||||
return "".join([hex(ord(c))[2:].zfill(2) for c in string])
|
return "".join([hex(ord(c))[2:].zfill(2) for c in string])
|
||||||
@ -71,9 +68,9 @@ class PlainAlgorithm():
|
|||||||
return (dbus.String("", variant_level=1), session)
|
return (dbus.String("", variant_level=1), session)
|
||||||
|
|
||||||
def encrypt(self, key, data):
|
def encrypt(self, key, data):
|
||||||
return ("", data)
|
return b"", data
|
||||||
|
|
||||||
def decrypt(self, param, data):
|
def decrypt(self, key, params, data):
|
||||||
if params == "":
|
if params == "":
|
||||||
raise InvalidArgs("invalid secret plain parameter")
|
raise InvalidArgs("invalid secret plain parameter")
|
||||||
return data
|
return data
|
||||||
@ -84,7 +81,7 @@ class AesAlgorithm():
|
|||||||
if type (param) != dbus.ByteArray:
|
if type (param) != dbus.ByteArray:
|
||||||
raise InvalidArgs("invalid argument passed to OpenSession")
|
raise InvalidArgs("invalid argument passed to OpenSession")
|
||||||
privat, publi = dh.generate_pair()
|
privat, publi = dh.generate_pair()
|
||||||
peer = dh.bytes_to_number(param)
|
peer = int.from_bytes(param, 'big')
|
||||||
# print "mock publi: ", hex(publi)
|
# print "mock publi: ", hex(publi)
|
||||||
# print " mock peer: ", hex(peer)
|
# print " mock peer: ", hex(peer)
|
||||||
ikm = dh.derive_key(privat, peer)
|
ikm = dh.derive_key(privat, peer)
|
||||||
@ -95,21 +92,17 @@ class AesAlgorithm():
|
|||||||
return (dbus.ByteArray(dh.number_to_bytes(publi), variant_level=1), session)
|
return (dbus.ByteArray(dh.number_to_bytes(publi), variant_level=1), session)
|
||||||
|
|
||||||
def encrypt(self, key, data):
|
def encrypt(self, key, data):
|
||||||
key = map(ord, key)
|
|
||||||
data = aes.append_PKCS7_padding(data)
|
data = aes.append_PKCS7_padding(data)
|
||||||
keysize = len(key)
|
keysize = len(key)
|
||||||
iv = [ord(i) for i in os.urandom(16)]
|
iv = os.urandom(16)
|
||||||
mode = aes.AESModeOfOperation.modeOfOperation["CBC"]
|
mode = aes.AESModeOfOperation.modeOfOperation["CBC"]
|
||||||
moo = aes.AESModeOfOperation()
|
moo = aes.AESModeOfOperation()
|
||||||
(mode, length, ciph) = moo.encrypt(data, mode, key, keysize, iv)
|
(mode, length, ciph) = moo.encrypt(data, mode, key, keysize, iv)
|
||||||
return ("".join([chr(i) for i in iv]),
|
return iv, ciph
|
||||||
"".join([chr(i) for i in ciph]))
|
|
||||||
|
|
||||||
def decrypt(self, key, param, data):
|
def decrypt(self, key, param, data):
|
||||||
key = map(ord, key)
|
|
||||||
keysize = len(key)
|
keysize = len(key)
|
||||||
iv = map(ord, param[:16])
|
iv = param[:16]
|
||||||
data = map(ord, data)
|
|
||||||
moo = aes.AESModeOfOperation()
|
moo = aes.AESModeOfOperation()
|
||||||
mode = aes.AESModeOfOperation.modeOfOperation["CBC"]
|
mode = aes.AESModeOfOperation.modeOfOperation["CBC"]
|
||||||
decr = moo.decrypt(data, None, mode, key, keysize, iv)
|
decr = moo.decrypt(data, None, mode, key, keysize, iv)
|
||||||
@ -197,7 +190,7 @@ class SecretItem(dbus.service.Object):
|
|||||||
self.collection = collection
|
self.collection = collection
|
||||||
self.identifier = identifier
|
self.identifier = identifier
|
||||||
self.label = label or "Unnamed item"
|
self.label = label or "Unnamed item"
|
||||||
self.secret = secret
|
self.secret = secret.encode('ascii') if isinstance(secret, str) else secret
|
||||||
self.type = type or "org.freedesktop.Secret.Generic"
|
self.type = type or "org.freedesktop.Secret.Generic"
|
||||||
self.attributes = attributes
|
self.attributes = attributes
|
||||||
self.content_type = content_type
|
self.content_type = content_type
|
||||||
@ -376,7 +369,7 @@ class SecretCollection(dbus.service.Object):
|
|||||||
self.PropertiesChanged('org.freedesktop.Secret.Collection', { "Locked" : lock }, [])
|
self.PropertiesChanged('org.freedesktop.Secret.Collection', { "Locked" : lock }, [])
|
||||||
|
|
||||||
def perform_delete(self):
|
def perform_delete(self):
|
||||||
for item in self.items.values():
|
for item in list(self.items.values()):
|
||||||
item.perform_delete()
|
item.perform_delete()
|
||||||
del objects[self.path]
|
del objects[self.path]
|
||||||
self.service.remove_collection(self)
|
self.service.remove_collection(self)
|
||||||
@ -529,8 +522,7 @@ class SecretService(dbus.service.Object):
|
|||||||
name = self.bus.get_unique_name()
|
name = self.bus.get_unique_name()
|
||||||
if not name:
|
if not name:
|
||||||
raise NameError("No unique name available")
|
raise NameError("No unique name available")
|
||||||
print name
|
print(name, flush=True)
|
||||||
sys.stdout.flush()
|
|
||||||
loop.run()
|
loop.run()
|
||||||
|
|
||||||
def add_session(self, session):
|
def add_session(self, session):
|
||||||
@ -691,8 +683,8 @@ class SecretService(dbus.service.Object):
|
|||||||
def parse_options(args):
|
def parse_options(args):
|
||||||
try:
|
try:
|
||||||
opts, args = getopt.getopt(args, "", [])
|
opts, args = getopt.getopt(args, "", [])
|
||||||
except getopt.GetoptError, err:
|
except getopt.GetoptError as err:
|
||||||
print str(err)
|
print(str(err))
|
||||||
sys.exit(2)
|
sys.exit(2)
|
||||||
for o, a in opts:
|
for o, a in opts:
|
||||||
assert False, "unhandled option"
|
assert False, "unhandled option"
|
||||||
|
Loading…
Reference in New Issue
Block a user