Monday, March 19, 2012

Blocking with RECEIVE and SqlDataReader for real-time updates

I have this scenario which is working fine, but would like to know if others have tried it or can recommend a better approach. Below is a brief description, but code should fully explain:

A process updates a SQL Server table via a stored proc, which in turn writes information to a service broker target queue. In my client/server architecture, I would like clients to see this new information as soon as it gets written to the target queue. To do this, I have a WCF service that calls a stored procedure; the stored proc contains an infinite loop which has a RECEIVE statement with an infinite timeout. Once RECEIVE reads data from the SB queue, data is retrieved via a SqlDataReader and results are sent to clients via a pub/sub. I then wait for more rows until RECEIVE unblocks and so on.

Stroed Proc

Code Snippet

-- ...
WHILE 1 = 1
BEGIN
-- Param declarations

...

BEGIN TRANSACTION;
WAITFOR
(
-- Blocks while TargetQueue is empty
RECEIVE
TOP(1)
@.conversation_handle = conversation_handle,
@.message_type_name = message_type_name,
@.conversation_group_id = conversation_group_id,
@.message_body =
CASE
WHEN validation = 'X' THEN CAST(message_body AS XML)
ELSE CAST(N'' AS XML)
END
FROM [dbo].[TargetQueue] -- No time out!
)

-- Handle errors
...

-- Return received information. After this statement is executed,
-- reader.NextResult unblocks and reader.Read() can read these
-- new values
SELECT 'Conversation Group Id' = @.conversation_group_id,
'Conversation Handle' = @.conversation_handle,
'Message Type Name' = @.message_type_name,
'Message Body' = @.message_body ;

COMMIT TRANSACTION


END -- WHILE

C# Code

Code Snippet

// Create a SqlCommand, and initialize to execute above stored proc
// (set command time to zero!)
...

// ExecuteReader blocks until RECEIVE reads data from the target queue
using (SqlDataReader reader = cmd.ExecuteReader())
{
// Begin an infinite loop
while (true)
{
// Process data retrieved by RECEIVE
while (reader.Read())
{
Trace.WriteLine("Conversation Group Id :" + reader.GetGuid(0) +
"Conversation Handle: " + reader.GetGuid(1) +
"Message Type Name : " + reader.GetString(2) +
"Message Body : " + reader.GetString(3));
// Send data via pub/sub
...
}

// Blocks until stored procedure returns another select statement
// i.e., blobks until RECEIVE unblocks and retrieves more data from queue
reader.NextResult();
} // while
}

There isn't anything wrong with your approach from what you show, but I do have a few questions:

Who ends the dialogs and how? The RECEIVE loop has to be prepared to process the with EndDialog and Error messages, otherwise you will leak conversations and your database will grow indefinetly. Also the stored procedure you mention, how does it find the dialog to send on and how does it end it?

Are the notifications important? Can you afford to loose one? You will lose notifications because WCF is cannot guarantee reliable delivery.

What scale are you talking about? One single RECEIVE loop with TOP(1) will have problems if you expect thousands of notifications per second.

|||

Remus,

Thanks for your thoughtful questions. My comments are below:

>What scale are you talking about? One single RECEIVE loop with TOP(1) will have problems if you expect thousands of notifications per second

Currently there will be less than 100 (One hundred) notifications per day. It is likely however, that these notifications will come in bursts as the process that ultimately populates the target queue (via uspSendMessage below) kicks in multiple times during the day. I can remove TOP(1) if necessary.

>Are the notifications important? Can you afford to loose one? You will lose notifications because WCF is cannot guarantee reliable delivery

Yes notifications to clients are very important as they will be used for trade position management. With WCF I am planning on using MSMQ to gurarantee delivery to clients. Is this what you meant?

> Who ends the dialogs and how? how does the stored proc find the dialog to send on and how does it end it?

I have two stored proc. uspSendMessage which updates a database table with new information, also does BEGIN DIALOG, then SEND ON CONVERSATION, then END CONVERSATION and COMMIT TRAN. uspReceiveMessage on the other hand does a RECEIVE (does not specify a conversion group id order to retreive all data), SELECT to select and send data to the SqlDataReader, END CONVERSATION, and then COMMIT TRAN.

Both stored proc from my test harness are shown below:

Code Snippet

CREATE PROCEDURE uspSendMessage

-- Add the parameters for the stored procedure here

@.CompanyName nvarchar(40),

@.PhoneName nvarchar(24)

AS

BEGIN

-- SET NOCOUNT ON added to prevent extra result sets from

-- interfering with SELECT statements.

SET NOCOUNT ON

begin try

-- Begin a transaction.

BEGIN TRANSACTION;

-- Insert data

insert into Shippers (CompanyName, Phone) values(@.CompanyName, @.PhoneName)

-- Create the message.

DECLARE @.message XML

SET @.message = N'<Shipper><CompanyName>' + @.CompanyName + '</CompanyName><Phone>' + @.PhoneName + '</Phone></Shipper>'

-- Declare a variable to hold the conversation handle, and then start the conversation

DECLARE @.conversationHandle UNIQUEIDENTIFIER

BEGIN DIALOG CONVERSATION @.conversationHandle

FROM SERVICE InitiatorService

TO SERVICE 'TargetService'

ON CONTRACT NewShipperContract

WITH ENCRYPTION = OFF;

-- Send the message on the dialog.

SEND ON CONVERSATION @.conversationHandle MESSAGE TYPE NewShipperMessage (@.message);

-- End the conversation.

END CONVERSATION @.conversationHandle

COMMIT TRANSACTION

end try

begin catch

select error_number() as ErrorNum, error_message() as ErrorMSg

-- Test XACT_STATE

IF (XACT_STATE()) = -1 -- Uncmmitable transaction state

BEGIN

ROLLBACK TRANSACTION;

END;

IF (XACT_STATE()) = 1 -- Tranaction active

BEGIN

COMMIT TRANSACTION;

END;

end catch

END

Code Snippet

CREATE PROCEDURE uspReceiveMessage

AS

BEGIN

-- SET NOCOUNT ON added to prevent extra result sets from

-- interfering with SELECT statements.

SET NOCOUNT ON;

WHILE 1 = 1

BEGIN

DECLARE @.conversation_handle UNIQUEIDENTIFIER,

@.conversation_group_id UNIQUEIDENTIFIER,

@.message_body XML,

@.message_type_name NVARCHAR(128);

BEGIN TRANSACTION;

WAITFOR

(

RECEIVE TOP(1)

@.conversation_handle = conversation_handle,

@.message_type_name = message_type_name,

@.conversation_group_id = conversation_group_id,

@.message_body =

CASE

WHEN validation = 'X' THEN CAST(message_body AS XML)

ELSE CAST(N'<none/>' AS XML)

END

FROM [dbo].[TargetQueue]

)

-- Handle error

IF @.@.ERROR <> 0

BEGIN

COMMIT TRANSACTION

BREAK

END

-- Show received information

SELECT 'Conversation Group Id' = @.conversation_group_id,

'Conversation Handle' = @.conversation_handle,

'Message Type Name' = @.message_type_name,

'Message Body' = @.message_body ;

-- If the message_type_name indicates that the message is an error

-- or an end dialog message, end the conversation.

IF @.message_type_name = 'http://schemas.microsoft.com/SQL/ServiceBroker/EndDialog'

OR @.message_type_name = 'http://schemas.microsoft.com/SQL/ServiceBroker/Error'

BEGIN

END CONVERSATION @.conversation_handle ;

END ;

COMMIT TRANSACTION

END -- WHILE

END

|||

Yazan Diranieh wrote:

>What scale are you talking about? One single RECEIVE loop with TOP(1) will have problems if you expect thousands of notifications per second

Currently there will be less than 100 (One hundred) notifications per day. It is likely however, that these notifications will come in bursts as the process that ultimately populates the target queue (via uspSendMessage below) kicks in multiple times during the day. I can remove TOP(1) if necessary.

If you will hit scalabilty problems removing TOP(1) will not help, as you are sending only one message per dialog anyway, so RECEIVE can only return one message at a time. See http://blogs.msdn.com/remusrusanu/archive/2007/04/24/reusing-conversations.aspx and http://blogs.msdn.com/remusrusanu/archive/2007/05/02/recycling-conversations.aspx on how to send more than one message per dialog. I wouldn't focus on this at the moment though.

Yazan Diranieh wrote:

>Are the notifications important? Can you afford to loose one? You will lose notifications because WCF is cannot guarantee reliable delivery

Yes notifications to clients are very important as they will be used for trade position management. With WCF I am planning on using MSMQ to gurarantee delivery to clients. Is this what you meant?

As it is now, if the process crashes after the RECEIVE was commited but before the WCF send occured (or reached the outgoing MSMQ queue), your notification is lost. You must use a distributed transaction and enroll both the SqlCommand.ExecuteReader and the WCF send in the same transaction context. This also means you cannot use one infintely executing batch, but instead loop in CLR code and call one RECEIVE at a time.

Yazan Diranieh wrote:

> Who ends the dialogs and how? how does the stored proc find the dialog to send on and how does it end it?

I have two stored proc. uspSendMessage which updates a database table with new information, also does BEGIN DIALOG, then SEND ON CONVERSATION, then END CONVERSATION and COMMIT TRAN. uspReceiveMessage on the other hand does a RECEIVE (does not specify a conversion group id order to retreive all data), SELECT to select and send data to the SqlDataReader, END CONVERSATION, and then COMMIT TRAN.

You are doing Fire-and-Forget, this will get you into trouble, see http://blogs.msdn.com/remusrusanu/archive/2006/04/06/570578.aspx. Try to end the conversation from the target side first and have the sender end its side as a response to the target's EndDialog message.

|||Very informative. Thanks very much.

No comments:

Post a Comment