Source code for cybox.objects.artifact_object

# Copyright (c) 2017, The MITRE Corporation. All rights reserved.
# See LICENSE.txt for complete terms.
import base64
import bz2
import zlib

from mixbox import entities
from mixbox import fields
from mixbox.vendor import six
from mixbox.compat import xor

import cybox.bindings.artifact_object as artifact_binding
from cybox.common import ObjectProperties, String, HashList


[docs]def validate_artifact_type(instance, value): if value is None: return elif value in Artifact.TYPES: return else: err = "Type must be one of %s. Received '%s'." % (Artifact.TYPES, value) raise ValueError(err)
[docs]def validate_byte_order_endianness(instance, value): if value is None: return elif value in RawArtifact.ENDIANNESS: return else: err = "Type must be one of %s. Received '%s'." % (RawArtifact.ENDIANNESS, value) raise ValueError(err)
[docs]class RawArtifact(String): _binding = artifact_binding _binding_class = _binding.RawArtifactType _namespace = 'http://cybox.mitre.org/objects#ArtifactObject-2' BIG_ENDIAN = "Big-endian" LITTLE_ENDIAN = "Little-endian" MIDDLE_ENDIAN = "Middle-endian" ENDIANNESS = (BIG_ENDIAN, LITTLE_ENDIAN, MIDDLE_ENDIAN) byte_order = fields.TypedField("byte_order", preset_hook=validate_byte_order_endianness)
[docs]class Compression(entities.Entity): """A Compression packaging layer Currently only zlib and bz2 are supported. Also, compression_mechanism_ref is not currently supported. """ _namespace = 'http://cybox.mitre.org/objects#ArtifactObject-2' _binding = artifact_binding _binding_class = _binding.CompressionType _COMPRESSION_TYPE = None # overridden by subclasses compression_mechanism = fields.TypedField("compression_mechanism") compression_mechanism_ref = fields.TypedField("compression_mechanism_ref") def __init__(self, compression_mechanism=None, compression_mechanism_ref=None): super(Compression, self).__init__() self.compression_mechanism = compression_mechanism self.compression_mechanism_ref = compression_mechanism_ref
[docs] def pack(self, data): """This should accept byte data and return byte data""" raise NotImplementedError()
[docs] def unpack(self, packed_data): """This should accept byte data and return byte data""" raise NotImplementedError()
[docs]class Encryption(entities.Entity): """ An encryption packaging layer. """ _namespace = 'http://cybox.mitre.org/objects#ArtifactObject-2' _binding = artifact_binding _binding_class = _binding.EncryptionType _ENCRYPTION_TYPE = None # overridden by subclasses encryption_mechanism = fields.TypedField("encryption_mechanism") encryption_mechanism_ref = fields.TypedField("encryption_mechanism_ref") encryption_key = fields.TypedField("encryption_key") encryption_key_ref = fields.TypedField("encryption_key_ref") def __init__(self, encryption_mechanism=None, encryption_key=None, encryption_mechanism_ref=None, encryption_key_ref=None): super(Encryption, self).__init__() self.encryption_mechanism = encryption_mechanism self.encryption_key = encryption_key self.encryption_mechanism_ref = encryption_mechanism_ref self.encryption_key_ref = encryption_key_ref
[docs] def pack(self, data): """This should accept byte data and return byte data""" raise NotImplementedError()
[docs] def unpack(self, packed_data): """This should accept byte data and return byte data""" raise NotImplementedError()
[docs]class Encoding(entities.Entity): """ An encoding packaging layer. Currently only base64 with a standard alphabet is supported. """ _binding = artifact_binding _binding_class = _binding.EncodingType _ENCODING_TYPE = None # overridden by subclasses algorithm = fields.TypedField("algorithm") character_set = fields.TypedField("character_set") custom_character_set_ref = fields.TypedField("custom_character_set_ref") def __init__(self, algorithm=None, character_set=None, custom_character_set_ref=None): super(Encoding, self).__init__() self.algorithm = algorithm self.character_set = character_set self.custom_character_set_ref = custom_character_set_ref
[docs] def pack(self, data): """This should accept byte data and return byte data""" raise NotImplementedError()
[docs] def unpack(self, packed_data): """This should accept byte data and return byte data""" raise NotImplementedError()
class EncryptionFactory(entities.EntityFactory): _ENCRYPTION_EXT_MAP = {} @classmethod def entity_class(cls, key): return cls._ENCRYPTION_EXT_MAP.get(key, Encryption) @classmethod def dictkey(cls, mapping): return mapping.get("encryption_mechanism") @classmethod def objkey(cls, obj): return obj.encryption_mechanism @classmethod def register_extension(cls, new_cls): cls._ENCRYPTION_EXT_MAP[new_cls._ENCRYPTION_TYPE] = new_cls return new_cls class CompressionFactory(entities.EntityFactory): _COMPRESSION_EXT_MAP = {} @classmethod def entity_class(cls, key): return cls._COMPRESSION_EXT_MAP.get(key, Compression) @classmethod def dictkey(cls, mapping): return mapping.get("compression_mechanism") @classmethod def objkey(cls, obj): return obj.compression_mechanism @classmethod def register_extension(cls, new_cls): cls._COMPRESSION_EXT_MAP[new_cls._COMPRESSION_TYPE] = new_cls return new_cls class EncodingFactory(entities.EntityFactory): _ENCODING_EXT_MAP = {} @classmethod def entity_class(cls, key): return cls._ENCODING_EXT_MAP.get(key, Encoding) @classmethod def dictkey(cls, mapping): return mapping.get("algorithm", "Base64") # default is Base64 @classmethod def objkey(cls, obj): return getattr(obj, "algorithm", "Base64") # default is Base64 @classmethod def register_extension(cls, new_cls): cls._ENCODING_EXT_MAP[new_cls._ENCODING_TYPE] = new_cls return new_cls @CompressionFactory.register_extension
[docs]class ZlibCompression(Compression): _COMPRESSION_TYPE = "zlib" def __init__(self): super(ZlibCompression, self).__init__(compression_mechanism="zlib")
[docs] def pack(self, data): return zlib.compress(data)
[docs] def unpack(self, packed_data): return zlib.decompress(packed_data)
@CompressionFactory.register_extension
[docs]class Bz2Compression(Compression): _COMPRESSION_TYPE = "bz2" def __init__(self): super(Bz2Compression, self).__init__(compression_mechanism="bz2")
[docs] def pack(self, data): return bz2.compress(data)
[docs] def unpack(self, packed_data): return bz2.decompress(packed_data)
@EncryptionFactory.register_extension
[docs]class XOREncryption(Encryption): _ENCRYPTION_TYPE = "xor" def __init__(self, key=None): super(XOREncryption, self).__init__( encryption_mechanism="xor", encryption_key=key )
[docs] def pack(self, data): return xor(data, self.encryption_key)
[docs] def unpack(self, packed_data): return xor(packed_data, self.encryption_key)
@EncryptionFactory.register_extension
[docs]class PasswordProtectedZipEncryption(Encryption): _ENCRYPTION_TYPE = "PasswordProtected" def __init__(self, key=None): super(PasswordProtectedZipEncryption, self).__init__( encryption_mechanism="PasswordProtected", encryption_key=key ) # `pack` is not implemented
[docs] def unpack(self, packed_data): from zipfile import ZipFile buf = six.StringIO(packed_data) with ZipFile(buf, 'r') as myzip: # Assume there is only one member in the archive, and that it # contains the artifact data. Ignore the name. filename = myzip.namelist()[0] data = myzip.read(filename, self.encryption_key) return data
@EncodingFactory.register_extension
[docs]class Base64Encoding(Encoding): _ENCODING_TYPE = "Base64" def __init__(self): super(Base64Encoding, self).__init__(algorithm="Base64")
[docs] def pack(self, data): return base64.b64encode(data)
[docs] def unpack(self, packed_data): return base64.b64decode(packed_data)
[docs]class Packaging(entities.Entity): """An individual packaging layer.""" _namespace = 'http://cybox.mitre.org/objects#ArtifactObject-2' _binding = artifact_binding _binding_class = _binding.PackagingType is_encrypted = fields.BooleanField("is_encrypted") is_compressed = fields.BooleanField("is_compressed") compression = fields.TypedField("Compression", Compression, factory=CompressionFactory, multiple=True) encryption = fields.TypedField("Encryption", Encryption, factory=EncryptionFactory, multiple=True) encoding = fields.TypedField("Encoding", Encoding, factory=EncodingFactory, multiple=True) def __init__(self, is_encrypted=None, is_compressed=None, compression=None, encryption=None, encoding=None): super(Packaging, self).__init__() self.is_encrypted = is_encrypted self.is_compressed = is_compressed self.compression = compression self.encryption = encryption self.encoding = encoding
[docs]class Artifact(ObjectProperties): # Warning: Do not attempt to get or set Raw_Artifact directly. Use `data` # or `packed_data` respectively. The Raw_Artifact value will be set on # export. You can set BaseObjectProperties or PatternFieldGroup attributes. _binding = artifact_binding _binding_class = _binding.ArtifactObjectType _namespace = 'http://cybox.mitre.org/objects#ArtifactObject-2' _XSI_NS = "ArtifactObj" _XSI_TYPE = "ArtifactObjectType" TYPE_FILE = "File" TYPE_MEMORY = "Memory Region" TYPE_FILE_SYSTEM = "File System Fragment" TYPE_NETWORK = "Network Traffic" TYPE_GENERIC = "Generic Data Region" TYPES = (TYPE_FILE, TYPE_FILE_SYSTEM, TYPE_GENERIC, TYPE_MEMORY, TYPE_NETWORK) hashes = fields.TypedField("Hashes", HashList) packaging = fields.TypedField("Packaging", Packaging) type_ = fields.TypedField("type_", key_name="type", preset_hook=validate_artifact_type) content_type = fields.TypedField("content_type") content_type_version = fields.TypedField("content_type_version") suspected_malicious = fields.TypedField("suspected_malicious") # TODO: xs:choice raw_artifact = fields.TypedField("Raw_Artifact", RawArtifact) raw_artifact_reference = fields.TypedField("Raw_Artifact_Reference") def __init__(self, data=None, type_=None): super(Artifact, self).__init__() self.type_ = type_ # `data` is the actual binary data that is being encoded in this # Artifact. It should use the `str` type on Python 2 or the `bytes` # type on Python 3. # `packed_data` is the literal character data that comes from (or # becomes) the contents of the Raw_Artifact element. It should be a # Unicode string (`unicode` on Python 2, `str` on Python 3), and should # in general be ASCII-encoded, since any other data should be # Base64-encoded. # Only one of these two attributes can be set directly. The other can # be calculated based on the various `Packaging` types added to this # Artifact. # We set the private attribute `_packed_data` first, so that the setter # for `data` has access to this attribute. self._packed_data = None self.data = data @property def data(self): """Should return a byte string""" if self._data: return self._data elif self._packed_data: tmp_data = self._packed_data.encode('ascii') if self.packaging: for p in reversed(self.packaging.encoding): tmp_data = p.unpack(tmp_data) for p in reversed(self.packaging.encryption): tmp_data = p.unpack(tmp_data) for p in reversed(self.packaging.compression): tmp_data = p.unpack(tmp_data) return tmp_data else: return None @data.setter def data(self, value): if self._packed_data: raise ValueError("packed_data already set, can't set data") if value is not None and not isinstance(value, six.binary_type): msg = ("Artifact data must be either None or byte data, not a " "Unicode string.") raise ValueError(msg) self._data = value @property def packed_data(self): """Should return a Unicode string""" if self._packed_data: return self._packed_data elif self._data: tmp_data = self._data if self.packaging: for p in self.packaging.compression: tmp_data = p.pack(tmp_data) for p in self.packaging.encryption: tmp_data = p.pack(tmp_data) for p in self.packaging.encoding: tmp_data = p.pack(tmp_data) return tmp_data.decode('ascii') else: return None @packed_data.setter def packed_data(self, value): if self._data: raise ValueError("data already set, can't set packed_data") if value is not None and not isinstance(value, six.text_type): msg = ("Artifact packed_data must be either None or a Unicode " "string, not byte data.") raise ValueError(msg) self._packed_data = value
[docs] def to_obj(self, ns_info=None): artifact_obj = super(Artifact, self).to_obj(ns_info=ns_info) if self.packed_data: if not self.raw_artifact: self.raw_artifact = RawArtifact() self.raw_artifact.value = self.packed_data artifact_obj.Raw_Artifact = self.raw_artifact.to_obj(ns_info=ns_info) return artifact_obj
[docs] def to_dict(self): artifact_dict = super(Artifact, self).to_dict() if self.packed_data: if not self.raw_artifact: self.raw_artifact = RawArtifact() self.raw_artifact.value = self.packed_data artifact_dict['raw_artifact'] = self.raw_artifact.to_dict() return artifact_dict
@classmethod
[docs] def from_obj(cls, cls_obj): if not cls_obj: return None artifact = super(Artifact, cls).from_obj(cls_obj) raw_artifact = cls_obj.Raw_Artifact if raw_artifact: artifact.raw_artifact = RawArtifact.from_obj(raw_artifact) artifact.packed_data = six.text_type(artifact.raw_artifact.value) return artifact
@classmethod
[docs] def from_dict(cls, cls_dict): if not cls_dict: return None artifact = super(Artifact, cls).from_dict(cls_dict) raw_artifact = cls_dict.get('raw_artifact') if raw_artifact: artifact.raw_artifact = RawArtifact.from_dict(raw_artifact) artifact.packed_data = six.text_type(artifact.raw_artifact.value) return artifact