what to specify in params to access kinesis
lifion-kinesis
Lifion'due south Node.js client for Amazon Kinesis Information Streams.
Getting Started
To install the module:
npm install lifion-kinesis --save
The main module export is a Kinesis course that instantiates as a readable stream.
const Kinesis = require ( 'lifion-kinesis' ) ; const kinesis = new Kinesis ( { streamName: 'sample-stream' /* other options from AWS.Kinesis */ } ) ; kinesis . on ( 'data' , data => { panel . log ( 'Incoming information:' , data ) ; } ) ; kinesis . startConsumer ( ) ;
To have advantage of back-pressure, the customer tin can be piped to a writable stream:
const { promisify } = require ( 'util' ) ; const Kinesis = require ( 'lifion-kinesis' ) ; const stream = require ( 'stream' ) ; const asyncPipeline = promisify ( stream . pipeline ) ; const kinesis = new Kinesis ( { streamName: 'sample-stream' /* other options from AWS.Kinesis */ } ) ; asyncPipeline ( kinesis , new stream . Writable ( { objectMode: true , write ( data , encoding , callback ) { panel . log ( data ) ; callback ( ) ; } } ) ) . catch ( panel . error ) ; kinesis . startConsumer ( ) ;
Features
- Standard Node.js stream abstraction of Kinesis streams.
- Node.js implementation of the new enhanced fan-out characteristic.
- Optional car-creation, encryption, and tagging of Kinesis streams.
- Support for a polling mode, using the
GetRecords
API, with automatic checkpointing. - Back up for multiple concurrent consumers through automatic consignment of shards.
- Support for sending messages to streams, with automobile-retries.
API Reference
- lifion-kinesis
- Kinesis ⇐
PassThrough
⏏ - new Kinesis(options)
- instance
- .startConsumer() ⇒
Promise
- .stopConsumer()
- .putRecord(params) ⇒
Promise
- .listShards(params) ⇒
Promise
- .putRecords(params) ⇒
Promise
- .getStats() ⇒
Object
- .startConsumer() ⇒
- static
- .getStats() ⇒
Object
- .getStats() ⇒
- Kinesis ⇐
Kinesis ⇐ PassThrough
⏏
A laissez passer-through stream class specialization implementing a consumer of Kinesis Data Streams using the AWS SDK for JavaScript. Incoming data can exist retrieved through either the data
consequence or by pipe the instance to other streams.
Kind: Exported class
Extends: PassThrough
new Kinesis(options)
Initializes a new case of the Kinesis client.
Param | Type | Default | Description |
---|---|---|---|
options | Object | The initialization options. In addition to the below options, it tin can also contain any of the AWS.Kinesis options. | |
[options.compression] | string | The kind of information compression to use with records. The currently available compression options are either "LZ-UTF8" or none. | |
[options.consumerGroup] | string | The name of the group of consumers in which shards will be distributed and checkpoints volition be shared. If not provided, it defaults to the name of the awarding/project using this module. | |
[options.createStreamIfNeeded] | boolean | true | Whether if the Kinesis stream should be automatically created if it doesn't exist upon connection |
[options.dynamoDb] | Object | {} | The initialization options for the DynamoDB client used to store the state of the consumers. In addition to tableNames and tags , it can as well contain whatsoever of the AWS.DynamoDB options. |
[options.dynamoDb.tableName] | string | The proper name of the table in which to store the state of consumers. If non provided, it defaults to "lifion-kinesis-state". | |
[options.dynamoDb.tags] | Object | If provided, the client will ensure that the DynamoDB table where the state is stored is tagged with these tags. If the table already has tags, they will be merged. | |
[options.encryption] | Object | The encryption options to enforce in the stream. | |
[options.encryption.blazon] | string | The encryption blazon to employ. | |
[options.encryption.keyId] | string | The GUID for the customer-managed AWS KMS fundamental to use for encryption. This value tin be a globally unique identifier, a fully specified ARN to either an alias or a cardinal, or an allonym name prefixed by "alias/". | |
[options.initialPositionInStream] | string | "LATEST" | The location in the shard from which the Consumer will offset fetching records from when the application starts for the first time and at that place is no checkpoint for the shard. Set to LATEST to fetch new data only Set to TRIM_HORIZON to kickoff from the oldest available data record. |
[options.leaseAcquisitionInterval] | number | 20000 | The interval in milliseconds for how often to effort lease acquisitions. |
[options.leaseAcquisitionRecoveryInterval] | number | 5000 | The interval in milliseconds for how ofttimes to re-attempt charter acquisitions when an error is returned from aws. |
[options.limit] | number | 10000 | The limit of records per get records phone call (only applicable with useEnhancedFanOut is set to imitation ) |
[options.logger] | Object | An object with the warn , debug , and error functions that will be used for logging purposes. If not provided, logging volition exist omitted. | |
[options.maxEnhancedConsumers] | number | 5 | An choice to set the number of enhanced fan-out consumer ARNs that the module should initialize. Defaults to five. Providing a number higher up the AWS limit (twenty) or beneath 1 will issue in using the default. |
[options.noRecordsPollDelay] | number | thousand | The filibuster in milliseconds before attempting to get more records when there were none in the previous attempt (but applicable when useEnhancedFanOut is set to false ) |
[options.pollDelay] | number | 250 | When the usePausedPolling choice is faux , this option defines the delay in milliseconds in betwixt poll requests for more records (merely applicable when useEnhancedFanOut is prepare to false ) |
[options.s3] | Object | {} | The initialization options for the S3 client used to store big items in buckets. In add-on to bucketName and endpoint , it can besides contain any of the AWS.S3 options. |
[options.s3.bucketName] | cord | The name of the saucepan in which to store large messages. If not provided, it defaults to the name of the Kinesis stream. | |
[options.s3.largeItemThreshold] | number | 900 | The size in KB above which an item should automatically be stored in s3. |
[options.s3.nonS3Keys] | Assortment.<string> | [] | If the useS3ForLargeItems option is gear up to true , the nonS3Keys option lists the keys that volition be sent normally on the kinesis tape. |
[options.s3.tags] | string | If provided, the client will ensure that the S3 bucket is tagged with these tags. If the bucket already has tags, they will be merged. | |
[options.shardCount] | number | 1 | The number of shards that the newly-created stream will use (if the createStreamIfNeeded option is prepare) |
[options.shouldDeaggregate] | string | boolean | "auto" | Whether the method retrieving the records should expect aggregated records and deaggregate them appropriately. |
[options.shouldParseJson] | string | boolean | "auto" | Whether if retrieved records' information should be parsed equally JSON or not. Set to "motorcar" to only try parsing if data looks like JSON. Set to true to strength data parse. |
[options.statsInterval] | number | 30000 | The interval in milliseconds for how often to emit the "stats" event. The upshot is but available while the consumer is running. |
options.streamName | string | The name of the stream to consume data from (required) | |
[options.supressThroughputWarnings] | boolean | false | Set up to true to make the customer log ProvisionedThroughputExceededException equally debug rather than warning. |
[options.tags] | Object | If provided, the client will ensure that the stream is tagged with these tags upon connection. If the stream is already tagged, the existing tags volition be merged with the provided ones earlier updating them. | |
[options.useAutoCheckpoints] | boolean | true | Set to true to make the client automatically store shard checkpoints using the sequence number of the most-recently received tape. If ready to imitation consumers tin can use the setCheckpoint() function to shop whatsoever sequence number as the checkpoint for the shard. |
[options.useAutoShardAssignment] | boolean | true | Set to true to automatically assign the stream shards to the active consumers in the same group (and so only one client reads from one shard at the same time). Gear up to imitation to make the client read from all shards. |
[options.useEnhancedFanOut] | boolean | false | Prepare to true to make the client use enhanced fan-out consumers to read from shards. |
[options.usePausedPolling] | boolean | false | Set up to true to make the client not to poll for more records until the consumer calls continuePolling() . This option is useful when consumers want to make sure the records are fully candy before receiving more (only applicable when useEnhancedFanOut is set to simulated ) |
[options.useS3ForLargeItems] | boolean | false | Whether to automatically use an S3 bucket to store large items or non. |
kinesis.startConsumer() ⇒ Promise
Starts the stream consumer, by ensuring that the stream exists, that it's prepare, and configured as requested. The internal managers that deal with heartbeats, state, and consumers will likewise be started.
Kind: instance method of Kinesis
Fulfil: undefined
- Once the consumer has successfully started.
Decline: Error
- On whatsoever unexpected error while trying to beginning.
kinesis.stopConsumer()
Stops the stream consumer. The internal managers will also be stopped.
Kind: instance method of Kinesis
kinesis.putRecord(params) ⇒ Hope
Writes a single information record into a stream.
Kind: instance method of Kinesis
Fulfil: Object
- The de-serialized data returned from the asking.
Pass up: Error
- On any unexpected error while writing to the stream.
Param | Type | Description |
---|---|---|
params | Object | The parameters. |
params.data | * | The data to put into the record. |
[params.explicitHashKey] | string | The hash value used to explicitly determine the shard the information record is assigned to by overriding the segmentation key hash. |
[params.partitionKey] | string | Determines which shard in the stream the data record is assigned to. If omitted, it will exist calculated based on a SHA-ane hash of the data. |
[params.sequenceNumberForOrdering] | string | Set this to the sequence number obtained from the last put tape performance to guarantee strictly increasing sequence numbers, for puts from the same client and to the same sectionalization key. If omitted, records are coarsely ordered based on arrival fourth dimension. |
[params.streamName] | string | If provided, the tape will be put into the specified stream instead of the stream proper name provided during the consumer instantiation. |
kinesis.listShards(params) ⇒ Promise
List the shards of a stream.
Kind: instance method of Kinesis
Fulfil: Object
- The de-serialized data returned from the request.
Reject: Error
- On whatsoever unexpected error while writing to the stream.
Param | Type | Description |
---|---|---|
params | Object | The parameters. |
[params.streamName] | string | If provided, the method will listing the shards of the specific stream instead of the stream name provided during the consumer instantiation. |
kinesis.putRecords(params) ⇒ Promise
Writes multiple information records into a stream in a single call.
Kind: example method of Kinesis
Fulfil: Object
- The de-serialized data returned from the request.
Refuse: Fault
- On whatsoever unexpected error while writing to the stream.
Param | Type | Description |
---|---|---|
params | Object | The parameters. |
params.records | Assortment.<Object> | The records associated with the request. |
params.records[].data | * | The record data. |
[params.records[].explicitHashKey] | cord | The hash value used to explicitly determine the shard the information record is assigned to by overriding the sectionalisation key hash. |
[params.records[].partitionKey] | cord | Determines which shard in the stream the information record is assigned to. If omitted, it will be calculated based on a SHA-1 hash of the information. |
[params.streamName] | string | If provided, the record will be put into the specified stream instead of the stream proper name provided during the consumer instantiation. |
kinesis.getStats() ⇒ Object
Returns statistics for the instance of the customer.
Kind: instance method of Kinesis
Returns: Object
- An object with the statistics.
Kinesis.getStats() ⇒ Object
Returns the aggregated statistics of all the instances of the client.
Kind: static method of Kinesis
Returns: Object
- An object with the statistics.
License
MIT
Source: https://github.com/lifion/lifion-kinesis
0 Response to "what to specify in params to access kinesis"
Post a Comment