news.stream¶
Stream¶
- class factiva.news.stream.Stream(stream_id=None, snapshot_id=None, query='', user_key=None, user_stats=False)¶
Bases:
object
Represent a Stream workflow for Factiva API.
- Parameters
stream_id (str) – represents a given stream by its id if exists there is no need to have a query or a given snapshot id
snapshot_id (str) – represents a snapshot by its id if exists it will be used to create a new stream
query (str) – represents a query if exists it will be used to create a new stream
user_key (str) – constructor will asign a stream user which has the access to Pubsub client, authentication headers and urls
required) (Auth method 2 (All) –
user_key – if the user_key is not passed it can be created based on the api key param
user_stats (bool) – if the user_key is not passed it can be created based on the request info param
required) –
user_id (str) – if the user_key is not passed it can be created based on the user id param
client_id (str) – if the user_key is not passed it can be created based on the client id param
password (str) – if the user_key is not passed it can be created based on the password param
Examples
- Creating a new Stream directly:
>>> stream_query_test = Stream( user_key='abcd1234abcd1234abcd1234abcd1234', user_stats=True, snapshot_id='<snapshot-id>', ) >>> print(stream_query_test.create()) >>> attributes id relationships type job_status JOB_STATE_PENDING dj-synhub-extraction-... NaN stream subscriptions NaN dj-synhub-extraction-... {'data': [{'id': 'dj-synhub-extraction-... stream
- property all_subscriptions: List[str]¶
List all subscriptions to a stream.
- consume_async_messages(callback=None, subscription_id=None, ack_enabled=False)¶
Consume messages (News) from a pubsub subscription in async.
- Parameters
callback (function) – is used for processing a message
subscription_id (str) – is used for connecting to pubsub
ack_enabled (boolean) – is used for acknowledging a message
- Raises
ValueError – when subscription id is invalid:
- consume_messages(callback=None, subscription_id=None, maximum_messages=None, batch_size=None, ack_enabled=False)¶
Consume messages (News) from a pubsub subscription in sync.
- Parameters
callback (function) – is used for processing a message
subscription_id (str) – is used for connecting to pubsub
maximum_messages (int) – is used for consuming a specific number of messages
batch_size (int) – the limit of the batch expected
ack_enabled (boolean) – is used for acknowledging a message
- Raises
ValueError – when subscription id is invalid:
- create() StreamResponse ¶
Create a stream instance.
There are two available options: Create a stream using a query Create a stream using a snapshot id
- Returns
of the current stream
- Return type
StreamResponse which contains all information
- Raises
ValueError – snapshot_id and query are undefined:
- create_default_subscription(response)¶
Create the default subscriptions at initialization.
Adds the subscriptions to subscriptions dict
- Parameters
response (dict) – is used for setting every subscription which exists inside the stream
- create_subscription() str ¶
Create another subscription for an existing stream.
- Returns
the new subscription id
- Return type
String which represents
- Raises
RuntimeError – when unable to create a subscription:
- delete() StreamResponse ¶
Delete a stream.
- Returns
of the current which is expected to be CANCELLED
- Return type
StreamResponse which contains all information
- Raises
ValueError – when stream id is undefined:
RuntimeError – when the stream does not exists:
RuntimeError – when exists an unexpected HTTP error:
- delete_subscription(sus_id) bool ¶
Delete subscription for an existing stream.
- Parameters
sus_id (str) – is the representation of a given subscription planned to be deleted
- Returns
was successfully done
- Return type
boolean which represents if the delete
- Raises
ValueError – when there is invalid subscription id:
RuntimeError – when unable to delete a subscription:
- get_all_streams() dict ¶
Obtain streams from a given user.
- Return type
Json object -> list of objects containing information about every stream (id, link, state, etc)
- Raises
RuntimeError – when exists an unexpected HTTP error:
- get_info() StreamResponse ¶
Query a stream by its id.
- Returns
of the current sream
- Return type
StreamResponse which contains all information
- Raises
ValueError – when stream id is undefined:
RuntimeError – when the stream does not exists:
RuntimeError – when exists an unexpected HTTP error:
- get_suscription_by_id(susbcription_id) Subscription ¶
- get_suscription_by_index(index) Subscription ¶
- get_suscription_id_by_index(index) str ¶
- listener = None¶
- set_all_subscriptions()¶
Allow a user to set all subscriptions from a stream to local storage.
- Return type
Dataframe which contains the state about the current stream
- Raises
ValueError – when stream id is undefined:
- snapshot_id = None¶
- stream_id = None¶
- property stream_url: str¶
List Stream’s URL address.
- stream_user = None¶
- subscriptions = {}¶
Subscription¶
- class factiva.news.stream.Subscription(stream_id=None, id=None, subscription_type=None)¶
Bases:
object
Represent a Subscription inside a stream.
Class that represents a Subscription inside a stream. There a two possible operations for a Subscription: - Create new one based on an existing Stream - Delete an existing subscription from a Stream
- Parameters
url (str) – url used to create/delete a subscription
stream_id (str) – represents a given stream by its id
- Raises
ValueError – when a stream_id is undefined:
Examples
- Creating a new Subscription directly:
>>> subscription = Subscription(<stream_id>) >>> created_subs = subscription.create( headers={'authorization': 'user-key'} ) >>> print(created_subs) >>> { "id": "dj-synhub-extraction-*HH**", "type": "subscription" }
- SUBSCRIPTION_IDX = 0¶
- consume_async_messages(callback=None, ack_enabled=False)¶
Consume async messages is a listener function.
Consume async messages is a listener function which consumes the current messages (News) from a pub-sub subscription in async
- Parameters
callback (function) – is used for processing a message
ack_enabled (boolean) – is used for acknowledging a message
- Raises
RuntimeError – when listener is not yet init:
- consume_messages(callback=None, maximum_messages=None, batch_size=None, ack_enabled=False)¶
Consume messages from a pubsub subscription in sync.
Consume messages is a listener function which consumes the current messages (News) from a pubsub subscription in sync
- Parameters
callback (function) – is used for processing a message
maximum_messages (int) – is used for consuming a specific number of messages
batch_size (int) – the limit of the batch expected
ack_enabled (boolean) – is used for acknowledging a message
- Raises
RuntimeError – when listener is not yet init:
- create(headers=None)¶
Create a subscription for a given stream instance.
Create subscription allows a user to create another subscription to a given stream
- Parameters
headers (dict) – which contains the token/acces key for authorization
- Returns
Data which contains
subscription’s id and type created
- Raises
ValueError – when a stream_id is undefined:
RuntimeError – when Unexpected API response happens:
- create_listener(user)¶
Create a listener in a separate step.
Create listener allows to create a listener in a separate step for avoiding undefined subscription id
- Parameters
user (StreamUser) – user which possess access to any credentials/client needed for listener
- Raises
RuntimeError – when user is not a StreamUser:
- delete(headers=None) bool ¶
Delete subscription for a given stream.
Delete subscription allows a user to delete a subscription to a given stream
- Parameters
headers (dict) – which contains the token/acces key for authorization
- Return type
bool value which shows if the subscription is complete deleted
- Raises
RuntimeError – when Unexpected API response happens:
- id = None¶
- listener = None¶
- stream_id = None¶
- subscription_type = None¶
Listener¶
- class factiva.news.stream.Listener(subscription_id=None, stream_user=None)¶
Bases:
object
Class that represents a Listener for Google Pubsub.
- Parameters
stream_user (Stream User) –
constructor will asign a stream user which has the access to the proper url and headers which are going to be used for:
Checking the exceeded documents
Consuming messages (articles) in sync
Consuming messages (articles) in async
subscription_id (str) – is used by Pubsub to consume messages in async/sync
Examples
- Creating a new Listener directly:
>>> listener = Listener( stream_user=StreamUser( user_key='****************************1234', request_info=False, user_id='******-svcaccount@dowjones.com', client_id='****************************5678', password='*******' ) ) >>> def callback(message, subscription_id): >>> print('Subscription ID: {}: Message: {}'.format( subscription_id, message )) >>> print(listener.listen( callback, subscription_id='<subscription-id>', maximum_messages=10 )) Received news message with ID: DJDN****************** Subscription ID: dj-synhub-stream-********************- km******-filtered-******: Message: {'an': 'DJDN0*********************', 'document_type': 'article', 'action': 'rep', 'source_code': 'DJDN', 'source_name': 'Dow Jones ---- -----', 'publication_date': '2021-05-20T08:00:10.255Z', 'publication_datetime': '2021-05-20T08:00:10.255Z', 'modification_date': '2021-05-20T08:04:56.175Z', 'modification_datetime': '2021-05-20T08:02:54.000Z', 'ingestion_datetime': '2021-05-20T08:00:13.000Z', 'title': "----- ------ ------ ------ 2020", 'snippet': '', 'body': "\nOn Thursday -- --- ----, --- Plc. announced its ----- ---- --- \n ...... }
- FIRST_OBJECT = 0¶
- check_exceeded_thread()¶
Check exceeded thread function.
creates threads for checking if the doc count has been exceeded
- listen(callback=<function default_callback>, maximum_messages=None, batch_size=10, ack_enabled=False)¶
Listen function.
listens the current messages (News) from a pubsub subscription in sync
- Parameters
callback (function) – is used for processing a message
maximum_messages (int) – is used for consuming a specific number of messages
batch_size (int) – the limit of the batch expected
ack_enabled (boolean) – flag for acknowledge a message
- Raises
ValueError: – When maximum_messages is undefined
GoogleAPICallError: – When there is no valid instance to pull from When something unexpected happened with Pubsub client
- listen_async(callback=<function default_callback>, ack_enabled=False)¶
Listen async function.
listens the current messages (News) from a pubsub subscription in async
- Parameters
callback (function) – is used for processing a message
ack_enabled (boolean) – flag for acknowledge a message
- property stream_id_uri¶
Property for retrieving the stream id uri.