news.stream

Stream

class factiva.news.stream.Stream(stream_id=None, snapshot_id=None, query='', user_key=None, user_stats=False)

Bases: object

Represent a Stream workflow for Factiva API.

Parameters
  • stream_id (str) – represents a given stream by its id if exists there is no need to have a query or a given snapshot id

  • snapshot_id (str) – represents a snapshot by its id if exists it will be used to create a new stream

  • query (str) – represents a query if exists it will be used to create a new stream

  • user_key (str) – constructor will asign a stream user which has the access to Pubsub client, authentication headers and urls

  • required) (Auth method 2 (All) –

  • user_key – if the user_key is not passed it can be created based on the api key param

  • user_stats (bool) – if the user_key is not passed it can be created based on the request info param

  • required)

  • user_id (str) – if the user_key is not passed it can be created based on the user id param

  • client_id (str) – if the user_key is not passed it can be created based on the client id param

  • password (str) – if the user_key is not passed it can be created based on the password param

Examples

Creating a new Stream directly:
>>> stream_query_test = Stream(
    user_key='abcd1234abcd1234abcd1234abcd1234',
    user_stats=True,
    snapshot_id='<snapshot-id>',
    )
>>> print(stream_query_test.create())
>>>                       attributes                          id      relationships                                     type
job_status     JOB_STATE_PENDING  dj-synhub-extraction-...                NaN                                          stream
subscriptions                NaN  dj-synhub-extraction-...          {'data': [{'id': 'dj-synhub-extraction-...         stream
property all_subscriptions: List[str]

List all subscriptions to a stream.

consume_async_messages(callback=None, subscription_id=None, ack_enabled=False)

Consume messages (News) from a pubsub subscription in async.

Parameters
  • callback (function) – is used for processing a message

  • subscription_id (str) – is used for connecting to pubsub

  • ack_enabled (boolean) – is used for acknowledging a message

Raises

ValueError – when subscription id is invalid:

consume_messages(callback=None, subscription_id=None, maximum_messages=None, batch_size=None, ack_enabled=False)

Consume messages (News) from a pubsub subscription in sync.

Parameters
  • callback (function) – is used for processing a message

  • subscription_id (str) – is used for connecting to pubsub

  • maximum_messages (int) – is used for consuming a specific number of messages

  • batch_size (int) – the limit of the batch expected

  • ack_enabled (boolean) – is used for acknowledging a message

Raises

ValueError – when subscription id is invalid:

create() StreamResponse

Create a stream instance.

There are two available options: Create a stream using a query Create a stream using a snapshot id

Returns

of the current stream

Return type

StreamResponse which contains all information

Raises

ValueError – snapshot_id and query are undefined:

create_default_subscription(response)

Create the default subscriptions at initialization.

Adds the subscriptions to subscriptions dict

Parameters

response (dict) – is used for setting every subscription which exists inside the stream

create_subscription() str

Create another subscription for an existing stream.

Returns

the new subscription id

Return type

String which represents

Raises

RuntimeError – when unable to create a subscription:

delete() StreamResponse

Delete a stream.

Returns

of the current which is expected to be CANCELLED

Return type

StreamResponse which contains all information

Raises
  • ValueError – when stream id is undefined:

  • RuntimeError – when the stream does not exists:

  • RuntimeError – when exists an unexpected HTTP error:

delete_subscription(sus_id) bool

Delete subscription for an existing stream.

Parameters

sus_id (str) – is the representation of a given subscription planned to be deleted

Returns

was successfully done

Return type

boolean which represents if the delete

Raises
  • ValueError – when there is invalid subscription id:

  • RuntimeError – when unable to delete a subscription:

get_all_streams() dict

Obtain streams from a given user.

Return type

Json object -> list of objects containing information about every stream (id, link, state, etc)

Raises

RuntimeError – when exists an unexpected HTTP error:

get_info() StreamResponse

Query a stream by its id.

Returns

of the current sream

Return type

StreamResponse which contains all information

Raises
  • ValueError – when stream id is undefined:

  • RuntimeError – when the stream does not exists:

  • RuntimeError – when exists an unexpected HTTP error:

get_suscription_by_id(susbcription_id) Subscription
get_suscription_by_index(index) Subscription
get_suscription_id_by_index(index) str
listener = None
set_all_subscriptions()

Allow a user to set all subscriptions from a stream to local storage.

Return type

Dataframe which contains the state about the current stream

Raises

ValueError – when stream id is undefined:

snapshot_id = None
stream_id = None
property stream_url: str

List Stream’s URL address.

stream_user = None
subscriptions = {}

Subscription

class factiva.news.stream.Subscription(stream_id=None, id=None, subscription_type=None)

Bases: object

Represent a Subscription inside a stream.

Class that represents a Subscription inside a stream. There a two possible operations for a Subscription: - Create new one based on an existing Stream - Delete an existing subscription from a Stream

Parameters
  • url (str) – url used to create/delete a subscription

  • stream_id (str) – represents a given stream by its id

Raises

ValueError – when a stream_id is undefined:

Examples

Creating a new Subscription directly:
>>> subscription = Subscription(<stream_id>)
>>> created_subs = subscription.create(
        headers={'authorization': 'user-key'}
    )
>>> print(created_subs)
>>> { "id": "dj-synhub-extraction-*HH**", "type": "subscription" }
SUBSCRIPTION_IDX = 0
consume_async_messages(callback=None, ack_enabled=False)

Consume async messages is a listener function.

Consume async messages is a listener function which consumes the current messages (News) from a pub-sub subscription in async

Parameters
  • callback (function) – is used for processing a message

  • ack_enabled (boolean) – is used for acknowledging a message

Raises

RuntimeError – when listener is not yet init:

consume_messages(callback=None, maximum_messages=None, batch_size=None, ack_enabled=False)

Consume messages from a pubsub subscription in sync.

Consume messages is a listener function which consumes the current messages (News) from a pubsub subscription in sync

Parameters
  • callback (function) – is used for processing a message

  • maximum_messages (int) – is used for consuming a specific number of messages

  • batch_size (int) – the limit of the batch expected

  • ack_enabled (boolean) – is used for acknowledging a message

Raises

RuntimeError – when listener is not yet init:

create(headers=None)

Create a subscription for a given stream instance.

Create subscription allows a user to create another subscription to a given stream

Parameters

headers (dict) – which contains the token/acces key for authorization

Returns

  • Data which contains

  • subscription’s id and type created

Raises
  • ValueError – when a stream_id is undefined:

  • RuntimeError – when Unexpected API response happens:

create_listener(user)

Create a listener in a separate step.

Create listener allows to create a listener in a separate step for avoiding undefined subscription id

Parameters

user (StreamUser) – user which possess access to any credentials/client needed for listener

Raises

RuntimeError – when user is not a StreamUser:

delete(headers=None) bool

Delete subscription for a given stream.

Delete subscription allows a user to delete a subscription to a given stream

Parameters

headers (dict) – which contains the token/acces key for authorization

Return type

bool value which shows if the subscription is complete deleted

Raises

RuntimeError – when Unexpected API response happens:

id = None
listener = None
stream_id = None
subscription_type = None

Listener

class factiva.news.stream.Listener(subscription_id=None, stream_user=None)

Bases: object

Class that represents a Listener for Google Pubsub.

Parameters
  • stream_user (Stream User) –

    constructor will asign a stream user which has the access to the proper url and headers which are going to be used for:

    • Checking the exceeded documents

    • Consuming messages (articles) in sync

    • Consuming messages (articles) in async

  • subscription_id (str) – is used by Pubsub to consume messages in async/sync

Examples

Creating a new Listener directly:
>>> listener = Listener(
        stream_user=StreamUser(
            user_key='****************************1234',
            request_info=False,
            user_id='******-svcaccount@dowjones.com',
            client_id='****************************5678',
            password='*******'
        )
    )
>>> def callback(message, subscription_id):
>>>     print('Subscription ID: {}: Message: {}'.format(
            subscription_id, message
        ))
>>> print(listener.listen(
        callback,
        subscription_id='<subscription-id>',
        maximum_messages=10
        ))
Received news message with ID: DJDN******************
Subscription ID: dj-synhub-stream-********************-
km******-filtered-******:
Message: {'an': 'DJDN0*********************',
'document_type': 'article', 'action': 'rep',
'source_code': 'DJDN', 'source_name': 'Dow Jones ---- -----',
'publication_date': '2021-05-20T08:00:10.255Z',
'publication_datetime': '2021-05-20T08:00:10.255Z',
'modification_date': '2021-05-20T08:04:56.175Z',
'modification_datetime': '2021-05-20T08:02:54.000Z',
'ingestion_datetime': '2021-05-20T08:00:13.000Z',
'title': "----- ------ ------ ------ 2020",
'snippet': '', 'body': "\nOn Thursday -- --- ----,
--- Plc. announced its ----- ---- --- \n
......
}
FIRST_OBJECT = 0
check_exceeded_thread()

Check exceeded thread function.

creates threads for checking if the doc count has been exceeded

listen(callback=<function default_callback>, maximum_messages=None, batch_size=10, ack_enabled=False)

Listen function.

listens the current messages (News) from a pubsub subscription in sync

Parameters
  • callback (function) – is used for processing a message

  • maximum_messages (int) – is used for consuming a specific number of messages

  • batch_size (int) – the limit of the batch expected

  • ack_enabled (boolean) – flag for acknowledge a message

Raises
  • ValueError: – When maximum_messages is undefined

  • GoogleAPICallError: – When there is no valid instance to pull from When something unexpected happened with Pubsub client

listen_async(callback=<function default_callback>, ack_enabled=False)

Listen async function.

listens the current messages (News) from a pubsub subscription in async

Parameters
  • callback (function) – is used for processing a message

  • ack_enabled (boolean) – flag for acknowledge a message

property stream_id_uri

Property for retrieving the stream id uri.