• 沒有找到結果。

Step 1: Set up required resources

In this step you create the following resources that you need for this getting-started scenario:

• An S3 bucket to serve as the destination that receives data from the connector.

• An MSK cluster to which you will send data. The connector will then read the data from this cluster and send it to the destination S3 bucket.

Step 1: Set up required resources

• An IAM role that allows the connector to write to the destination S3 bucket.

• An Amazon VPC endpoint to make it possible to send data from the Amazon VPC that has the cluster and the connector to Amazon S3.

To create the S3 bucket

1. Sign in to the AWS Management Console and open the Amazon S3 console at https://

console.aws.amazon.com/s3/.

2. Choose Create bucket.

3. Enter mkc-tutorial-destination-bucket for the name of the bucket.

4. Scroll down and choose Create bucket.

5. In the list of bucket, choose the newly created mkc-tutorial-destination-bucket.

6. Choose Create folder.

7. Enter tutorial for the name of the folder, then scroll down and choose Create folder.

To create the cluster

1. Open the Amazon MSK console at https://console.aws.amazon.com/msk/home?region=us-east-1#/

home/.

2. In the left pane, under MSK Clusters, choose Clusters.

3. Choose Create cluster.

4. Choose Custom create.

5. For the cluster name enter mkc-tutorial-cluster.

6. Under Networking, choose an Amazon VPC, then set the Number of Zones to 2, then select the Availability Zones and subnets that you want to use. Remember the IDs the of the Amazon VPC and subnets you selected because we need them later in this tutorial.

7. Under Access control methods ensure that only Unauthenticated access is selected.

8. Under Encryption ensure that only Plaintext is selected.

9. Scroll down and choose Create cluster. This takes you to the cluster's details page. On that page, look for the security group ID under Security groups applied. Remember that ID because we need it later in this tutorial.

To create the IAM role that can write to the destination bucket 1. Open the IAM console at https://console.aws.amazon.com/iam/.

2. In the left pane, under Access management, choose Roles.

3. Choose Create role.

4. Under Or select a service to view its use cases, choose S3.

5. Scroll down and under Select your use case, again choose S3.

6. Choose Next: Permissions.

7. Choose Create policy. This opens a new tab in your browser where you will create the policy. Leave the original role-creation tab open because we'll get back to it later.

8. Choose the JSON tab, then replace the text in the window with the following policy.

{ "Version":"2012-10-17", "Statement":[

Step 1: Set up required resources "s3:ListAllMyBuckets"

],

"Resource":"arn:aws:s3:::*"

}, {

"Effect":"Allow", "Action":[

"s3:ListBucket", "s3:GetBucketLocation"

],

"Resource":"arn:aws:s3:::mkc-tutorial-destination-bucket"

}, {

"Effect":"Allow", "Action":[

"s3:PutObject", "s3:GetObject",

"s3:AbortMultipartUpload", "s3:ListMultipartUploadParts", "s3:ListBucketMultipartUploads"

],

"Resource":"*"

} ] }

9. Choose Next: Tags.

10. Choose Next: Review.

11. Enter mkc-tutorial-policy for the policy name, then scroll down and choose Create policy.

12. Back in the browser tab where you were creating the role, choose the refresh button.

13. Find the mkc-tutorial-policy and select it by choosing the button to its left.

14. Choose Next: Tags.

15. Choose Next: Review.

16. Enter mkc-tutorial-role for the role name, and delete the text in the description box.

17. Choose Create role.

To allow MSK Connect to assume the role

1. In the IAM console, in the left pane, under Access management, choose Roles.

2. Find the mkc-tutorial-role and choose it.

3. Under the role's Summary, choose the Trust relationships tab.

4. Choose Edit trust relationship.

5. Replace the existing trust policy with the following JSON.

{

"Service": "kafkaconnect.amazonaws.com"

},

"Action": "sts:AssumeRole"

} ] }

6. Choose Update Trust Policy.

Step 2: Create custom plugin

To create an Amazon VPC endpoint from the cluster's VPC to Amazon S3 1. Open the Amazon VPC console at https://console.aws.amazon.com/vpc/.

2. In the left pane, choose Endpoints.

3. Choose Create endpoint.

4. Under Service Name choose the com.amazonaws.us-east-1.s3 service and the Gateway type.

5. Choose the cluster's VPC and then select the box to the left of the route table that is associated with the cluster's subnets.

6. Choose Create endpoint.

Next Step

Step 2: Create custom plugin (p. 56)

Step 2: Create custom plugin

A plugin contains the code that defines the logic of the connector. In this step you create a custom plugin that has the code for the Lenses Amazon S3 Sink Connector. In a later step, when you create the MSK connector, you specify that its code is in this custom plugin. You can use the same plugin to create multiple MSK connectors with different configurations.

To create the custom plugin

1. Download the Lenses Amazon S3 Sink Connector JAR.

2. Compress the downloaded JAR file to turn it into a ZIP file.

3. Upload the ZIP file to an S3 bucket to which you have access. For information on how to upload files to Amazon S3, see Uploading objects in the Amazon S3 user guide.

4. Open the Amazon MSK console at https://console.aws.amazon.com/msk/.

5. In the left pane expand MSK Connect, then choose Custom plugins.

6. Choose Create custom plugin.

7. Choose Browse S3.

8. In the list of buckets find the bucket where you uploaded the ZIP file, and choose that bucket.

9. In the list of objects in the bucket, select the radio button to the left of the ZIP file, then choose the button labeled Choose.

10. Enter mkc-tutorial-plugin for the custom plugin name, then choose Create custom plugin.

It might take AWS a few minutes to finish creating the custom plugin. When the creation process is complete, you see the following message in a banner at the top of the browser window.

Custom plugin mkc-tutorial-plugin was successfully created

The custom plugin was created. You can now create a connector using this custom plugin.

Next Step

Step 3: Create client machine and Apache Kafka topic (p. 56)

Step 3: Create client machine and Apache Kafka topic

Step 3: Create client machine and Apache Kafka topic To create a client instance

1. Open the Amazon EC2 console at https://console.aws.amazon.com/ec2/.

2. Choose Launch instance.

3. Choose Select to create an instance of Amazon Linux 2 AMI (HVM), SSD Volume Type.

4. Choose the t2.xlarge instance type by selecting the check box next to it.

5. Choose Next: Configure Instance Details.

6. In the Network list, choose the same VPC whose name you saved when you created the cluster in the section called “Step 1: Set up required resources” (p. 53).

7. In the Auto-assign Public IP list, choose Enable.

8. In the menu near the top, choose 5. Add Tags.

9. Choose Add Tag.

10. Enter Name for the Key and mkc-tutorial-client for the Value.

11. Choose Review and Launch, and then choose Launch.

12. In the first list, choose the option to Create a new key pair, enter mkc-tutorial-key-pair for Key pair name, and then choose Download Key Pair. Alternatively, you can use an existing key pair if you prefer.

13. Choose Launch Instances.

14. In the bottom right part of the screen, choose View Instances.

15. In the list of instances, find mkc-tutorial-client. Choose it by selecting the box to its left. Make sure no other instances are also selected.

16. In the bottom half of the screen, choose the Security tab.

17. Under Security groups copy the ID of the security group. We use it in the following procedure.

To allow the newly created client to send data to the cluster

1. Open the Amazon VPC console at https://console.aws.amazon.com/vpc/.

2. In the left pane, under SECURITY, choose Security Groups. In the Security group ID column, find the security group of the cluster. You saved the ID of this security group when you created the cluster in the section called “Step 1: Set up required resources” (p. 53). Choose this security group by selecting the box to the left of its row. Make sure no other security groups are simultaneously selected.

3. In the bottom half of the screen, choose the Inbound rules tab.

4. Choose Edit inbound rules.

5. In the bottom left of the screen, choose Add rule.

6. In the new rule, choose All traffic in the Type column. In the field to the right of the Source column, enter the ID of the security group of the client instance. This is the security group ID that you saved after you created the client in the previous procedure.

7. Choose Save rules. Your MSK cluster will now accept all traffic from the client you created in the previous procedure.

To create a topic

1. Open the Amazon EC2 console at https://console.aws.amazon.com/ec2/.

2. In the table of instances choose mkc-tutorial-client.

3. Near the top of the screen, choose Connect, then follow the instructions to connect to the instance.

4. Install Java on the client instance by running the following command:

Step 4: Create connector

sudo yum install java-1.8.0

5. Run the following command to download Apache Kafka.

wget https://archive.apache.org/dist/kafka/2.2.1/kafka_2.12-2.2.1.tgz

Note

If you want to use a mirror site other than the one used in this command, you can choose a different one on the Apache website.

6. Run the following command in the directory where you downloaded the TAR file in the previous step.

tar -xzf kafka_2.12-2.2.1.tgz 7. Go to the kafka_2.12-2.2.1 directory.

8. Open the Amazon MSK console at https://console.aws.amazon.com/msk/home?region=us-east-1#/

home/.

9. In the left pane choose Clusters, then choose the name mkc-tutorial-cluster.

10. Choose View client information.

11. Copy the Apache ZooKeeper connection string that's under the label Plaintext. Also copy the bootstrap servers string. You need both of these strings in the following steps.

12. Choose Done.

13. Run the following command on the client instance (mkc-tutorial-client), replacing ZookeeperConnectString with the value that you saved when you viewed the cluster's client information.

bin/kafka-topics.sh --create --zookeeper ZookeeperConnectString --replication-factor 2 --partitions 1 --topic mkc-tutorial-topic

If the command succeeds, you see the following message: Created topic mkc-tutorial-topic.

Next Step

Step 4: Create connector (p. 58)

Step 4: Create connector

To create the connector

1. Sign in to the AWS Management Console, and open the Amazon MSK console at https://

console.aws.amazon.com/msk/home?region=us-east-1#/home/.

2. In the left pane, expand MSK Connect, then choose Connectors.

3. Choose Create connector.

4. In the list of plugins, choose mkc-tutorial-plugin, then choose Next.

5. For the connector name enter mkc-tutorial-connector.

6. In the list of clusters, choose mkc-tutorial-cluster.

7. Copy the following configuration and paste it into the connector configuration field.

Step 5: Send data key.converter.schemas.enable=false

connect.s3.kcql=INSERT INTO tutorial-destination-bucket:tutorial SELECT * FROM mkc-tutorial-topic

aws.region=us-east-1 tasks.max=2

topics=mkc-tutorial-topic schema.enable=false

value.converter=org.apache.kafka.connect.storage.StringConverter errors.log.enable=true

key.converter=org.apache.kafka.connect.storage.StringConverter 8. Under Access permissions choose mkc-tutorial-role.

9. Choose Next. On the Security page, choose Next again.

10. On the Logs page choose Next.

11. Under Review and create choose Create connector.

Next Step

Step 5: Send data (p. 59)

Step 5: Send data

In this step you send data to the Apache Kafka topic that you created earlier, and then look for that same data in the destination S3 bucket.

To send data to the MSK cluster

1. In the bin folder of the Apache Kafka installation on the client instance, create a text file named client.properties with the following contents.

security.protocol=PLAINTEXT

2. Run the following command in the bin folder, replacing BootstrapBrokerString with the value that you obtained when you ran the previous command.

./kafka-console-producer.sh --broker-list BootstrapBrokerString --producer.config client.properties --topic mkc-tutorial-topic

3. Enter any message that you want, and press Enter. Repeat this step two or three times. Every time you enter a line and press Enter, that line is sent to your Apache Kafka cluster as a separate message.

4. Look in the destination Amazon S3 bucket to find the messages that you sent in the previous step.

Connectors

A connector integrates external systems and Amazon services with Apache Kafka by continuously copying streaming data from a data source into your Apache Kafka cluster, or continuously copying data from your cluster into a data sink. A connector can also perform lightweight logic such as transformation, format conversion, or filtering data before delivering the data to a destination. Source connectors pull data from a data source and push this data into the cluster, while sink connectors pull data from the cluster and push this data into a data sink.

The following diagram shows the architecture of a connector. A worker is a Java virtual machine (JVM) process that runs the connector logic. Each worker creates a set of tasks that run in parallel threads

Capacity

and do the work of copying the data. Tasks don't store state, and can therefore be started, stopped, or restarted at any time in order to provide a resilient and scalable data pipeline.

Connector capacity

The total capacity of a connector depends on the number of workers that the connector has, as well as on the number of MSK Connect Units (MCUs) per worker. Each MCU represents 1 vCPU of compute and 4 GiB of memory. To create a connector, you must choose between one of the following two capacity modes.

• Provisioned: Choose this mode if you know the capacity requirements for your connector. You specify two values:

• The number of workers.

• The number of MCUs per worker.

• Auto scaled: Choose this mode if the capacity requirements for your connector are variable or if you don't know them in advance. You specify three sets of values:

• The minimum and maximum number of workers.

• The scale-in and scale-out percentages for CPU utilization, which is determined by the

CpuUtilization metric. When the CpuUtilization metric for the connector exceeds the

scale-Creating a connector

the number of workers. The number of workers always remains within the minimum and maximum numbers that you specify when you create the connector.

• The number of MCUs per worker.

For more information about workers, see the section called “Workers” (p. 62). To learn about MSK Connect metrics, see the section called “Monitoring” (p. 73).

Creating a connector

Creating a connector using the AWS Management Console

1. Open the Amazon MSK console at https://console.aws.amazon.com/msk/.

2. In the left pane, under MSK Connect, choose Connectors.

3. Choose Create connector.

4. You can choose between using an existing custom plugin to create the connector, or creating a new custom plugin first. For information on custom plugins and how to create them, see the section called “Plugins” (p. 62). In this procedure, let's assume you have a custom plugin that you want to use. In the list of custom plugins, find the one that you want to use, and select the box to its left, then choose Next.

5. Enter a name and, optionally, a description.

6. Choose the cluster that you want to connect to.

7. Specify the connector configuration. The configuration parameters that you need to specify depend on the type of connector that you want to create. However, some parameters are common to all connectors, for example, the connector.class and tasks.max parameters. The following is an example configuration for the Confluent Amazon S3 Sink Connector.

connector.class=io.confluent.connect.s3.S3SinkConnector tasks.max=2

topics=my-example-topic s3.region=us-east-1

s3.bucket.name=my-destination-bucket flush.size=1

storage.class=io.confluent.connect.s3.storage.S3Storage format.class=io.confluent.connect.s3.format.json.JsonFormat

partitioner.class=io.confluent.connect.storage.partitioner.DefaultPartitioner key.converter=org.apache.kafka.connect.storage.StringConverter

value.converter=org.apache.kafka.connect.storage.StringConverter schema.compatibility=NONE

8. Next, you configure your connector capacity. You can choose between two capacity modes:

provisioned and auto scaled. For information about these two options, see the section called

“Capacity” (p. 60).

9. Choose either the default worker configuration or a custom worker configuration. For information about creating custom worker configurations, see the section called “Workers” (p. 62).

10. Next, you specify the service execution role. This must be an IAM role that MSK Connect can

assume, and that grants the connector all the permissions that it needs to access the necessary AWS resources. Those permissions depend on the logic of the connector. For information about how to create this role, see the section called “Service Execution Role” (p. 65).

11. Choose Next, review the security information, then choose Next again.

12. Specify the logging options that you want, then choose Next. For information about logging, see the section called “Logging” (p. 72).

13. Choose Create connector.

To use the MSK Connect API to create a connector, see CreateConnector.

Plugins

Plugins

A plugin is an AWS resource that contains the code that defines your connector logic. You upload a JAR file (or a ZIP file that contains one or more JAR files) to an S3 bucket, and specify the location of the bucket when you create the plugin. When you create a connector, you specify the plugin that you want MSK Connect to use for it. The relationship of plugins to connectors is one-to-many: You can create one or more connectors from the same plugin.

For information on how to develop the code for a connector, see the Connector Development Guidein the Apache Kafka documentation.

Creating a custom plugin using the AWS Management Console

1. Open the Amazon MSK console at https://console.aws.amazon.com/msk/.

2. In the left pane, under MSK Connect, choose Custom plugins.

3. Choose Create custom plugin.

4. Choose Browse S3.

5. In the list of S3 buckets, choose the bucket that has the JAR or ZIP file for the plugin.

6. In the list of object, select the box to the left of the JAR or ZIP file for the plugin, then choose Choose.

7. Choose Create custom plugin.

To use the MSK Connect API to create a custom plugin, see CreateCustomPlugin.

Workers

A worker is a Java virtual machine (JVM) process that runs the connector logic. Each worker creates a set of tasks that run in parallel threads and do the work of copying the data. Tasks don't store state, and can therefore be started, stopped, or restarted at any time in order to provide a resilient and scalable data pipeline. Changes to the number of workers, whether due to a scaling event or due to unexpected failures, are automatically detected by the remaining workers. They coordinate to rebalance tasks across the set of remaining workers. Connect workers use Apache Kafka's consumer groups to coordinate and rebalance.

If your connector's capacity requirements are variable or difficult to estimate, you can let MSK Connect scale the number of workers as needed between a lower limit and an upper limit that you specify.

Alternatively, you can specify the exact number of workers that you want to run your connector logic. For more information, see the section called “Capacity” (p. 60).

Alternatively, you can specify the exact number of workers that you want to run your connector logic. For more information, see the section called “Capacity” (p. 60).

相關文件