# Copyright (c) 2015, The MITRE Corporation. All rights reserved.
# See LICENSE.txt for complete terms.
import cybox
import cybox.bindings.cybox_core as core_binding
from cybox.common import MeasureSource, ObjectProperties, StructuredText
from cybox.core import Object, Event
[docs]class Observable(cybox.Entity):
"""A single Observable.
"""
_binding = core_binding
_namespace = 'http://cybox.mitre.org/cybox-2'
def __init__(self, item=None, id_=None, idref=None, title=None, description=None):
"""Create an Observable out of 'item'.
`item` can be any of:
- an Object
- an Event
- an ObservableComposition
- any subclass of ObjectProperties.
In the first three cases, the appropriate property of the Observable
will be set. In the last cases, an Object will be built automatically
to ensure the correct hierarchy is created.
"""
super(Observable, self).__init__()
if not id_ and not idref:
id_ = cybox.utils.create_id(prefix="Observable")
self.id_ = id_
self.title = title
self.description = description
self.object_ = None
self.event = None
self.observable_composition = None
self.idref = idref
self.sighting_count = None
self.observable_source = []
self.keywords = Keywords()
self.pattern_fidelity = None
if item is None:
return
elif isinstance(item, Object):
self.object_ = item
elif isinstance(item, ObservableComposition):
self.observable_composition = item
elif isinstance(item, Event):
self.event = item
elif isinstance(item, ObjectProperties):
if item.parent:
self.object_ = item.parent
else:
self.object_ = Object(item)
else:
msg = ("item must be an Object, Event, ObservableComposition, or "
"subclass of ObjectProperties. Received an %s" % type(item))
raise TypeError(msg)
@property
def id_(self):
return self._id
@id_.setter
def id_(self, value):
if not value:
self._id = None
else:
self._id = value
self.idref = None
@property
def idref(self):
return self._idref
@idref.setter
def idref(self, value):
if not value:
self._idref = None
else:
self._idref = value
self.id_ = None # unset id_ if idref is present
@property
def object_(self):
return self._object
@object_.setter
def object_(self, value):
if value:
if self.event:
msg = 'Observable already has an Event.'
raise ValueError(msg)
elif self.observable_composition:
msg = 'Observable already has an ObservableComposition.'
raise ValueError(msg)
if not isinstance(value, Object):
raise TypeError('value must be an Object')
self._object = value
@property
def event(self):
return self._event
@event.setter
def event(self, value):
if value:
if self.object_:
raise ValueError('Observable already has an Object.')
elif self.observable_composition:
msg = 'Observable already has an ObservableComposition.'
raise ValueError(msg)
if not isinstance(value, Event):
raise TypeError('value must be an Event')
self._event = value
@property
def observable_composition(self):
return self._observable_composition
@observable_composition.setter
def observable_composition(self, value):
if value:
if self.object_:
raise ValueError('Observable already has an Object.')
elif self.event:
msg = 'Observable already has an Event.'
raise ValueError(msg)
if not isinstance(value, ObservableComposition):
raise TypeError('value must be an ObservableComposition')
self._observable_composition = value
@property
def description(self):
return self._description
@description.setter
def description(self, value):
if value is not None and not isinstance(value, StructuredText):
value = StructuredText(value)
self._description = value
[docs] def add_keyword(self, value):
self.keywords.append(value)
[docs] def to_obj(self, return_obj=None, ns_info=None):
self._collect_ns_info(ns_info)
obs_obj = core_binding.ObservableType()
obs_obj.id = self.id_
if self.title is not None:
obs_obj.Title = self.title
if self.description is not None:
obs_obj.Description = self.description.to_obj(ns_info=ns_info)
if self.object_:
obs_obj.Object = self.object_.to_obj(ns_info=ns_info)
if self.event:
obs_obj.Event = self.event.to_obj(ns_info=ns_info)
if self.observable_composition:
obs_obj.Observable_Composition = self.observable_composition.to_obj(ns_info=ns_info)
if self.idref is not None:
obs_obj.idref = self.idref
if self.sighting_count is not None:
obs_obj.sighting_count = self.sighting_count
if self.observable_source:
obs_obj.Observable_Source = [x.to_obj(ns_info=ns_info) for x in self.observable_source]
if self.keywords:
obs_obj.Keywords = self.keywords.to_obj(ns_info=ns_info)
if self.pattern_fidelity:
obs_obj.Pattern_Fidelity = self.pattern_fidelity.to_obj(ns_info=ns_info)
return obs_obj
[docs] def to_dict(self):
obs_dict = {}
if self.id_ is not None:
obs_dict['id'] = self.id_
if self.title is not None:
obs_dict['title'] = self.title
if self.description is not None:
obs_dict['description'] = self.description.to_dict()
if self.object_:
obs_dict['object'] = self.object_.to_dict()
if self.event:
obs_dict['event'] = self.event.to_dict()
if self.observable_composition:
obs_dict['observable_composition'] = self.observable_composition.to_dict()
if self.idref is not None:
obs_dict['idref'] = self.idref
if self.sighting_count is not None:
obs_dict['sighting_count'] = self.sighting_count
if self.observable_source:
obs_dict['observable_source'] = [x.to_dict() for x in self.observable_source]
if self.keywords:
obs_dict['keywords'] = self.keywords.to_dict()
if self.pattern_fidelity:
obs_dict['pattern_fidelity'] = self.pattern_fidelity.to_dict()
return obs_dict
@staticmethod
[docs] def from_obj(observable_obj):
if not observable_obj:
return None
from cybox.core import PatternFidelity
obs = Observable()
obs.id_ = observable_obj.id
obs.title = observable_obj.Title
obs.description = StructuredText.from_obj(observable_obj.Description)
obs.object_ = Object.from_obj(observable_obj.Object)
obs.event = Event.from_obj(observable_obj.Event)
obs.observable_composition = ObservableComposition.from_obj(observable_obj.Observable_Composition)
obs.idref = observable_obj.idref
obs.sighting_count = observable_obj.sighting_count
if observable_obj.Observable_Source:
obs.observable_source = [MeasureSource.from_obj(x) for x in observable_obj.Observable_Source]
obs.keywords = Keywords.from_obj(observable_obj.Keywords)
obs.pattern_fidelity = PatternFidelity.from_obj(observable_obj.Pattern_Fidelity)
return obs
@staticmethod
[docs] def from_dict(observable_dict):
if not observable_dict:
return None
from cybox.core import PatternFidelity
obs = Observable()
obs.id_ = observable_dict.get('id')
obs.title = observable_dict.get('title')
obs.description = StructuredText.from_dict(observable_dict.get('description'))
obs.object_ = Object.from_dict(observable_dict.get('object'))
obs.event = Object.from_dict(observable_dict.get('event'))
obs.observable_composition = ObservableComposition.from_dict(observable_dict.get('observable_composition'))
obs.idref = observable_dict.get('idref')
obs.sighting_count = observable_dict.get('sighting_count')
if observable_dict.get('observable_source'):
obs.observable_source = [MeasureSource.from_dict(x) for x in observable_dict.get('observable_source')]
obs.keywords = Keywords.from_dict(observable_dict.get('keywords'))
obs.pattern_fidelity = PatternFidelity.from_dict(observable_dict.get('pattern_fidelity'))
return obs
[docs]class Observables(cybox.EntityList):
"""The root CybOX Observables object.
Pools are not currently supported.
"""
_binding = core_binding
_contained_type = Observable
_namespace = 'http://cybox.mitre.org/cybox-2'
def __init__(self, observables=None):
super(Observables, self).__init__()
# Assume major_verion and minor_version are immutable for now
self._major_version = 2
self._minor_version = 1
self._update_version = 0
self.observable_package_source = None
self.observables = []
try:
for obs in observables:
self.add(obs)
except TypeError:
# A single observable
self.add(observables)
@property
def observables(self):
return self._inner
@observables.setter
def observables(self, value):
self._inner = value
[docs] def add(self, observable):
if not observable:
return
if not isinstance(observable, Observable):
observable = Observable(observable)
self.observables.append(observable)
[docs] def to_obj(self, return_obj=None, ns_info=None):
self._collect_ns_info(ns_info)
observables_obj = core_binding.ObservablesType(
cybox_major_version=self._major_version,
cybox_minor_version=self._minor_version,
cybox_update_version=self._update_version)
#Required
observables_obj.Observable = [x.to_obj(ns_info=ns_info) for x in self.observables]
#Optional
if self.observable_package_source:
observables_obj.Observable_Package_Source = self.observable_package_source.to_obj(ns_info=ns_info)
return observables_obj
[docs] def to_dict(self):
observables_dict = {}
#Required
observables_dict['major_version'] = self._major_version
observables_dict['minor_version'] = self._minor_version
observables_dict['update_version'] = self._update_version
observables_dict['observables'] = [x.to_dict() for x in self.observables]
#Optional
if self.observable_package_source:
observables_dict['observable_package_source'] = self.observable_package_source.to_dict()
return observables_dict
@staticmethod
[docs] def from_obj(observables_obj):
if not observables_obj:
return None
#TODO: look at major_version and minor_version
obs = Observables()
# get_Observable() actually returns a list
for o in observables_obj.Observable:
obs.add(Observable.from_obj(o))
obs.observable_package_source = MeasureSource.from_obj(observables_obj.Observable_Package_Source)
return obs
@staticmethod
[docs] def from_dict(observables_dict):
if observables_dict is None:
return None
#TODO: look at major_version and minor_version
obs = Observables()
for o in observables_dict.get("observables", []):
obs.add(Observable.from_dict(o))
obs.observable_package_source = MeasureSource.from_dict(observables_dict.get('observable_package_source'))
return obs
[docs]class ObservableComposition(cybox.Entity):
'''The ObservableCompositionType entity defines a logical compositions of
CybOX Observables. The combinatorial behavior is derived from the operator
property.'''
_namespace = 'http://cybox.mitre.org/cybox-2'
OPERATOR_AND = 'AND'
OPERATOR_OR = 'OR'
OPERATORS = (OPERATOR_AND, OPERATOR_OR)
def __init__(self, operator='AND', observables=None):
super(ObservableComposition, self).__init__()
self.operator = operator
self.observables = []
if observables:
try:
for obs in observables:
self.add(obs)
except TypeError as t:
# A single observable
self.add(observables)
@property
def operator(self):
return self._operator
@operator.setter
def operator(self, value):
if value not in self.OPERATORS:
raise ValueError('value must be one of: %s' % ' '.join(self.OPERATORS) )
self._operator = value
[docs] def add(self, observable):
if not observable:
raise ValueError("'observable' must not be None")
if not isinstance(observable, Observable):
observable = Observable(observable)
self.observables.append(observable)
[docs] def to_obj(self, return_obj=None, ns_info=None):
self._collect_ns_info(ns_info)
observable_list = [x.to_obj(ns_info=ns_info) for x in self.observables]
return core_binding.ObservableCompositionType(
operator = self._operator,
Observable=observable_list)
[docs] def to_dict(self):
return {
'operator': self._operator,
'observables': [x.to_dict() for x in self.observables]
}
@staticmethod
[docs] def from_obj(observable_comp_obj):
if not observable_comp_obj:
return None
obs_comp = ObservableComposition()
obs_comp.operator = observable_comp_obj.operator
# get_Observable() actually returns a list
for o in observable_comp_obj.Observable:
obs_comp.add(Observable.from_obj(o))
return obs_comp
@staticmethod
[docs] def from_dict(observable_comp_dict):
if not observable_comp_dict:
return None
obs_comp = ObservableComposition()
obs_comp.operator = observable_comp_dict.get('operator', 'AND')
for o in observable_comp_dict.get("observables", []):
obs_comp.add(Observable.from_dict(o))
return obs_comp
[docs]class Keywords(cybox.EntityList):
_binding = core_binding
_binding_class = core_binding.KeywordsType
_binding_var = "Keyword"
_contained_type = cybox.Unicode
_namespace = 'http://cybox.mitre.org/cybox-2'